[jira] [Commented] (FLINK-4609) Remove redundant check for null in CrossOperator
[ https://issues.apache.org/jira/browse/FLINK-4609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479200#comment-15479200 ] ASF GitHub Bot commented on FLINK-4609: --- GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2490 [FLINK-4609] Remove redundant check for null in CrossOperator https://issues.apache.org/jira/browse/FLINK-4609 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4609 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2490.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2490 commit 272007fc8f350b1b998f28762aade6760b588c73 Author: Alexander PivovarovDate: 2016-09-10T05:24:43Z [FLINK-4609] Remove redundant check for null in CrossOperator > Remove redundant check for null in CrossOperator > > > Key: FLINK-4609 > URL: https://issues.apache.org/jira/browse/FLINK-4609 > Project: Flink > Issue Type: Bug > Components: Java API >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > > CrossOperator checks input1 and input2 for null after they were dereferenced -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2490 [FLINK-4609] Remove redundant check for null in CrossOperator https://issues.apache.org/jira/browse/FLINK-4609 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4609 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2490.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2490 commit 272007fc8f350b1b998f28762aade6760b588c73 Author: Alexander PivovarovDate: 2016-09-10T05:24:43Z [FLINK-4609] Remove redundant check for null in CrossOperator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4609) Remove redundant check for null in CrossOperator
Alexander Pivovarov created FLINK-4609: -- Summary: Remove redundant check for null in CrossOperator Key: FLINK-4609 URL: https://issues.apache.org/jira/browse/FLINK-4609 Project: Flink Issue Type: Bug Components: Java API Affects Versions: 1.1.2 Reporter: Alexander Pivovarov Priority: Trivial CrossOperator checks input1 and input2 for null after they were dereferenced -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-4608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Pivovarov updated FLINK-4608: --- Description: Max/Min AggregationFunction use & instead of &&. Usually we use short-circuit logic in if operators in java (was: Max/Min AggregationFunction use & instead of &&. Usually we use && in if blocks) > Use short-circuit AND in Max/Min AggregationFunction > > > Key: FLINK-4608 > URL: https://issues.apache.org/jira/browse/FLINK-4608 > Project: Flink > Issue Type: Bug > Components: Java API >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > > Max/Min AggregationFunction use & instead of &&. Usually we use short-circuit > logic in if operators in java -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-4608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Pivovarov updated FLINK-4608: --- Description: Max/Min AggregationFunction use & instead of &&. Usually we use && in if blocks (was: Max/Min AggregationFunction use & instead of &&) > Use short-circuit AND in Max/Min AggregationFunction > > > Key: FLINK-4608 > URL: https://issues.apache.org/jira/browse/FLINK-4608 > Project: Flink > Issue Type: Bug > Components: Java API >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > > Max/Min AggregationFunction use & instead of &&. Usually we use && in if > blocks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-4608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479159#comment-15479159 ] ASF GitHub Bot commented on FLINK-4608: --- GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2489 [FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction https://issues.apache.org/jira/browse/FLINK-4608 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4608 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2489.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2489 commit ceb3aa05bae3d05b9f6c5a7a55e6e43fc91a9450 Author: Alexander PivovarovDate: 2016-09-10T04:52:36Z [FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction > Use short-circuit AND in Max/Min AggregationFunction > > > Key: FLINK-4608 > URL: https://issues.apache.org/jira/browse/FLINK-4608 > Project: Flink > Issue Type: Bug > Components: Java API >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > > Max/Min AggregationFunction use & instead of && -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2489: [FLINK-4608] Use short-circuit AND in Max/Min Aggr...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2489 [FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction https://issues.apache.org/jira/browse/FLINK-4608 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4608 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2489.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2489 commit ceb3aa05bae3d05b9f6c5a7a55e6e43fc91a9450 Author: Alexander PivovarovDate: 2016-09-10T04:52:36Z [FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction
[ https://issues.apache.org/jira/browse/FLINK-4608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Pivovarov updated FLINK-4608: --- Description: Max/Min AggregationFunction use & instead of && > Use short-circuit AND in Max/Min AggregationFunction > > > Key: FLINK-4608 > URL: https://issues.apache.org/jira/browse/FLINK-4608 > Project: Flink > Issue Type: Bug > Components: Java API >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > > Max/Min AggregationFunction use & instead of && -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction
Alexander Pivovarov created FLINK-4608: -- Summary: Use short-circuit AND in Max/Min AggregationFunction Key: FLINK-4608 URL: https://issues.apache.org/jira/browse/FLINK-4608 Project: Flink Issue Type: Bug Components: Java API Affects Versions: 1.1.2 Reporter: Alexander Pivovarov Priority: Trivial -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2485#discussion_r78270898 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert.assertEquals +import org.junit._ + +class ExplainStreamTest + extends StreamingMultipleProgramsTestBase { + + val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile + + @Test + def testFilter() : Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val table = env.fromElements((1, "hello")) + .toTable(tEnv, 'a, 'b) + .filter("a % 2 = 0") + +val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") +val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n") +assertEquals(result, source) + } + + @Test + def testUnion() : Unit = { --- End diff -- no need for space after () on line 35 and 50 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2485#discussion_r78270846 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala --- @@ -271,5 +271,21 @@ abstract class StreamTableEnvironment( } } + /* + * Returns the AST of the specified Table API and SQL queries and the execution plan to compute +* the result of the given [[Table]]. + * + * @param table The table for which the AST and execution plan will be returned. +* @param extended Flag to include detailed optimizer estimates. + */ + def explain(table: Table): String = { + +val ast = RelOptUtil.toString(table.getRelNode) + +s"== Abstract Syntax Tree ==" + + System.lineSeparator + + s"$ast" --- End diff -- No need for s on line 285 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2485#discussion_r78270826 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala --- @@ -271,5 +271,21 @@ abstract class StreamTableEnvironment( } } + /* + * Returns the AST of the specified Table API and SQL queries and the execution plan to compute +* the result of the given [[Table]]. + * + * @param table The table for which the AST and execution plan will be returned. +* @param extended Flag to include detailed optimizer estimates. + */ + def explain(table: Table): String = { + +val ast = RelOptUtil.toString(table.getRelNode) + +s"== Abstract Syntax Tree ==" + + System.lineSeparator + + s"$ast" --- End diff -- Maybe this? ``` s"== Abstract Syntax Tree ==${System.lineSeparator}$ast" ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2485#discussion_r78270796 --- Diff: docs/dev/table_api.md --- @@ -2457,3 +2457,27 @@ The Table API provides a configuration (the so-called `TableConfig`) to modify r By default, the Table API supports `null` values. Null handling can be disabled to improve preformance by setting the `nullCheck` property in the `TableConfig` to `false`. {% top %} + +Explaining a Table + +The Table API provides a mechanism to describe the graph of operations that leads to the resulting output. This is done through the `TableEnvironment#explain(table)` method. It returns a string describing two graphs: the Abstract Syntax Tree of the relational algebra query and the Flink's Execution Plan of the equivalent Flink's Job. + +Table `explain` is supported for both `BatchTableEnvironment` and `StreamTableEnvironment`. Currently `StreamTableEnvironment` doesn't support the explanation of the Execution Plan. + + + +{% highlight scala %} + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table = table1.unionAll(table2) + + val explanation:String = tEnv.explain(table) --- End diff -- put space before String --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479106#comment-15479106 ] ASF GitHub Bot commented on FLINK-4520: --- Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270764 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams; + private final Map dataStreamSchemas; + private final Map extensions = new HashMap<>(); + + public Map getDataStreams(){ + return this.dataStreams; + } + + public Map getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); + } + + public static SiddhiStream.SingleSiddhiStream define(String streamId, DataStream inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId,inStream,fieldNames); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId, DataStream inStream, String... fieldNames){ + this.registerStream(streamId,inStream,fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId){ + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.UnionSiddhiStream union(String firstStreamId,String ... unionStreamIds){ + return new SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds); + } + + public void registerStream(final String streamId, DataStream dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + }
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479103#comment-15479103 ] ASF GitHub Bot commented on FLINK-4520: --- Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270757 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams; + private final Map dataStreamSchemas; + private final Map extensions = new HashMap<>(); + + public Map getDataStreams(){ + return this.dataStreams; + } + + public Map getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); + } + + public static SiddhiStream.SingleSiddhiStream define(String streamId, DataStream inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId,inStream,fieldNames); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId, DataStream inStream, String... fieldNames){ + this.registerStream(streamId,inStream,fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId){ + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.UnionSiddhiStream union(String firstStreamId,String ... unionStreamIds){ + return new SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds); + } + + public void registerStream(final String streamId, DataStream dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + }
[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270764 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams; + private final Map dataStreamSchemas; + private final Map extensions = new HashMap<>(); + + public Map getDataStreams(){ + return this.dataStreams; + } + + public Map getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); + } + + public static SiddhiStream.SingleSiddhiStream define(String streamId, DataStream inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId,inStream,fieldNames); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId, DataStream inStream, String... fieldNames){ + this.registerStream(streamId,inStream,fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId){ + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.UnionSiddhiStream union(String firstStreamId,String ... unionStreamIds){ + return new SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds); + } + + public void registerStream(final String streamId, DataStream dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return executionEnvironment; + } + + public void registerExtension(String extensionName, Class extensionClass) { +
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479102#comment-15479102 ] ASF GitHub Bot commented on FLINK-4520: --- Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270750 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams; + private final Map dataStreamSchemas; + private final Map extensions = new HashMap<>(); + + public Map getDataStreams(){ + return this.dataStreams; + } + + public Map getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); --- End diff -- lines 64 and 65 can be removed. Add `= new HashMap<>();` to lines 36 and 37 similar as it was done on line 38 > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi
[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270757 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams; + private final Map dataStreamSchemas; + private final Map extensions = new HashMap<>(); + + public Map getDataStreams(){ + return this.dataStreams; + } + + public Map getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); + } + + public static SiddhiStream.SingleSiddhiStream define(String streamId, DataStream inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId,inStream,fieldNames); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId, DataStream inStream, String... fieldNames){ + this.registerStream(streamId,inStream,fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId){ + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.UnionSiddhiStream union(String firstStreamId,String ... unionStreamIds){ + return new SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds); + } + + public void registerStream(final String streamId, DataStream dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return executionEnvironment; + } + + public void registerExtension(String extensionName, Class extensionClass) { +
[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270750 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams; + private final Map dataStreamSchemas; + private final Map extensions = new HashMap<>(); + + public Map getDataStreams(){ + return this.dataStreams; + } + + public Map getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); --- End diff -- lines 64 and 65 can be removed. Add `= new HashMap<>();` to lines 36 and 37 similar as it was done on line 38 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4607) Close FileInputStream in ParameterTool and other
[ https://issues.apache.org/jira/browse/FLINK-4607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479062#comment-15479062 ] ASF GitHub Bot commented on FLINK-4607: --- GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2488 [FLINK-4607] Close FileInputStream in ParameterTool and other https://issues.apache.org/jira/browse/FLINK-4607 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4607 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2488 commit dc0e636f630f8ab96d7cc71ede0a5a3ea5ce24a4 Author: Alexander PivovarovDate: 2016-09-10T03:32:28Z [FLINK-4607] Close FileInputStream in ParameterTool and other > Close FileInputStream in ParameterTool and other > > > Key: FLINK-4607 > URL: https://issues.apache.org/jira/browse/FLINK-4607 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > > ParameterTool and some tests do not close FileInputStream > {code} > flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java > flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java > flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java > flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java > flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2488: [FLINK-4607] Close FileInputStream in ParameterToo...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2488 [FLINK-4607] Close FileInputStream in ParameterTool and other https://issues.apache.org/jira/browse/FLINK-4607 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4607 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2488 commit dc0e636f630f8ab96d7cc71ede0a5a3ea5ce24a4 Author: Alexander PivovarovDate: 2016-09-10T03:32:28Z [FLINK-4607] Close FileInputStream in ParameterTool and other --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4607) Close FileInputStream in ParameterTool and other
Alexander Pivovarov created FLINK-4607: -- Summary: Close FileInputStream in ParameterTool and other Key: FLINK-4607 URL: https://issues.apache.org/jira/browse/FLINK-4607 Project: Flink Issue Type: Bug Affects Versions: 1.1.2 Reporter: Alexander Pivovarov Priority: Trivial ParameterTool and some tests do not close FileInputStream {code} flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4606) Integrate the new ResourceManager with the existing FlinkResourceManager
zhangjing created FLINK-4606: Summary: Integrate the new ResourceManager with the existing FlinkResourceManager Key: FLINK-4606 URL: https://issues.apache.org/jira/browse/FLINK-4606 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: zhangjing Assignee: zhangjing Integrate the new ResourceManager with the existing FlinkResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478991#comment-15478991 ] Hao Chen commented on FLINK-4520: - Hi Flink community, The PR is available now, could anybogy help assign the ticket to me and review the PR? Any response is highly appreciated. Thanks, Hao > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Chen updated FLINK-4520: Labels: cep library patch-available (was: cep library) Fix Version/s: 1.2.0 > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478961#comment-15478961 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2486 Closed this PR for cleaning commit history and recreated another PR #2487 > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2486: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...
Github user haoch commented on the issue: https://github.com/apache/flink/pull/2486 Closed this PR for cleaning commit history and recreated another PR #2487 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478958#comment-15478958 ] ASF GitHub Bot commented on FLINK-4520: --- GitHub user haoch opened a pull request: https://github.com/apache/flink/pull/2487 [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-weight Streaming CEP Library Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed # Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. __It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.__ # Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) # Test Cases * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) # Example StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream> output = cep .from("inputStream1").union("inputStream2") .sql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream" ) .returns("outputStream"); env.execute(); You can merge this pull request into a Git repository by running: $ git pull https://github.com/haoch/flink FLINK-4520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2487.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2487 commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0 Author: Hao Chen Date: 2016-08-29T12:34:42Z Implement initial version of flink-siddhi stream operator commit
[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...
GitHub user haoch opened a pull request: https://github.com/apache/flink/pull/2487 [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-weight Streaming CEP Library Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed # Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. __It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.__ # Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) # Test Cases * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) # Example StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream> output = cep .from("inputStream1").union("inputStream2") .sql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream" ) .returns("outputStream"); env.execute(); You can merge this pull request into a Git repository by running: $ git pull https://github.com/haoch/flink FLINK-4520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2487.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2487 commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0 Author: Hao Chen Date: 2016-08-29T12:34:42Z Implement initial version of flink-siddhi stream operator commit d8b131d9204e9e359afae80087afe2aecab27eaf Author: Chen, Hao Date: 2016-08-30T14:33:59Z Implement SiddhiStream API and DSL commit 56c150fadc54d06c186223e43a6fd9ac74eee837 Author: Chen, Hao Date: 2016-08-30T18:31:18Z Reformat
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478928#comment-15478928 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch closed the pull request at: https://github.com/apache/flink/pull/2486 > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2486: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...
Github user haoch closed the pull request at: https://github.com/apache/flink/pull/2486 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2465: [FLINK-4447] [docs] Include NettyConfig options on Config...
Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2465 Hi @greghogan thanks for the PR. Took a look at it and seems good to go. The test failure doesn't seem related. Since I'm new to the community, I would probably ask someone else to have a look as well. @uce , if you can have a pass over this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4447) Include NettyConfig options on Configurations page
[ https://issues.apache.org/jira/browse/FLINK-4447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478759#comment-15478759 ] ASF GitHub Bot commented on FLINK-4447: --- Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2465 Hi @greghogan thanks for the PR. Took a look at it and seems good to go. The test failure doesn't seem related. Since I'm new to the community, I would probably ask someone else to have a look as well. @uce , if you can have a pass over this? > Include NettyConfig options on Configurations page > -- > > Key: FLINK-4447 > URL: https://issues.apache.org/jira/browse/FLINK-4447 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.2.0 > > > {{NettyConfig}} looks for the following configuration options which are not > listed in the Flink documentation. > {noformat} > public static final String NUM_ARENAS = "taskmanager.net.num-arenas"; > public static final String NUM_THREADS_SERVER = > "taskmanager.net.server.numThreads"; > public static final String NUM_THREADS_CLIENT = > "taskmanager.net.client.numThreads"; > public static final String CONNECT_BACKLOG = > "taskmanager.net.server.backlog"; > public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = > "taskmanager.net.client.connectTimeoutSec"; > public static final String SEND_RECEIVE_BUFFER_SIZE = > "taskmanager.net.sendReceiveBufferSize"; > public static final String TRANSPORT_TYPE = "taskmanager.net.transport"; > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2442: [FLINK-4148] incorrect calculation minDist distance in Qu...
Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2442 Hi @xhumanoid , thanks for the PR. Could you please check the Failure messages and fix the build? Can help review once the PR is cleanly mergable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4148) incorrect calculation distance in QuadTree
[ https://issues.apache.org/jira/browse/FLINK-4148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478750#comment-15478750 ] ASF GitHub Bot commented on FLINK-4148: --- Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2442 Hi @xhumanoid , thanks for the PR. Could you please check the Failure messages and fix the build? Can help review once the PR is cleanly mergable. > incorrect calculation distance in QuadTree > -- > > Key: FLINK-4148 > URL: https://issues.apache.org/jira/browse/FLINK-4148 > Project: Flink > Issue Type: Bug >Reporter: Alexey Diomin >Priority: Trivial > Attachments: > 0001-FLINK-4148-incorrect-calculation-minDist-distance-in.patch > > > https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/QuadTree.scala#L105 > Because EuclideanDistanceMetric extends SquaredEuclideanDistanceMetric we > always move in first case and never reach case for math.sqrt(minDist) > correct match first EuclideanDistanceMetric and after it > SquaredEuclideanDistanceMetric > p.s. because EuclideanDistanceMetric more compute expensive and stay as > default DistanceMetric it's can cause some performance degradation for KNN on > default parameters -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3457) Link to Apache Flink meetups from the 'Community' section of the website
[ https://issues.apache.org/jira/browse/FLINK-3457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478687#comment-15478687 ] Neelesh Srinivas Salian commented on FLINK-3457: Hi [~xazax], are you working on this? If not, shall I grab it an post a PR? > Link to Apache Flink meetups from the 'Community' section of the website > > > Key: FLINK-3457 > URL: https://issues.apache.org/jira/browse/FLINK-3457 > Project: Flink > Issue Type: Task > Components: Documentation >Reporter: Slim Baltagi >Assignee: Gabor Horvath >Priority: Trivial > > Now with the number of Apache Flink meetups increasing worldwide, it is > helpful to add a link to Apache Flink meetups > http://www.meetup.com/topics/apache-flink/ to the community section of > https://flink.apache.org/community.html so visitors can conveniently find > them right from the Apache Flink website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2859) Add links to docs and JIRA issues in FlinkML roadmap
[ https://issues.apache.org/jira/browse/FLINK-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neelesh Srinivas Salian closed FLINK-2859. -- Resolution: Fixed This is fixed since the page has the link to the FLINK-ML JIRAs list. > Add links to docs and JIRA issues in FlinkML roadmap > > > Key: FLINK-2859 > URL: https://issues.apache.org/jira/browse/FLINK-2859 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Priority: Trivial > Labels: ML > > The FlinkML [vision and roadmap > doc|https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap] > lists a number of features we aim to have in FlinkML. > It would be helpful to newcomers if each feature linked to its corresponding > JIRA issue, and already implemented features to their page in the docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-1914) Wrong FS while starting YARN session without correct HADOOP_HOME
[ https://issues.apache.org/jira/browse/FLINK-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neelesh Srinivas Salian closed FLINK-1914. -- Resolution: Fixed > Wrong FS while starting YARN session without correct HADOOP_HOME > > > Key: FLINK-1914 > URL: https://issues.apache.org/jira/browse/FLINK-1914 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Zoltán Zvara >Priority: Trivial > Labels: yarn, yarn-client > > When YARN session invoked ({{yarn-session.sh}}) without a correct > {{HADOOP_HOME}} (AM still deployed to - for example to {{0.0.0.0:8032}}), but > the deployed AM fails with an {{IllegalArgumentException}}: > {code} > java.lang.IllegalArgumentException: Wrong FS: > file:/home/.../flink-dist-0.9-SNAPSHOT.jar, expected: hdfs://localhost:9000 > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:181) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92) > at > org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106) > at > org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102) > at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:105) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:436) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:371) > at scala.util.Try$.apply(Try.scala:161) > at > org.apache.flink.yarn.ApplicationMasterActor$class.org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession(ApplicationMasterActor.scala:371) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:155) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) > {code} > IMO this {{IllegalArgumentException}} should get handled in > {{org.apache.flink.yarn.Utils.registerLocalResource}} or on an upper level to > provide a better error message. This needs to be looked up from YARN logs at > the moment, which is painful to a trivial mistake like missing > {{HADOOP_HOME}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1914) Wrong FS while starting YARN session without correct HADOOP_HOME
[ https://issues.apache.org/jira/browse/FLINK-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478539#comment-15478539 ] Neelesh Srinivas Salian commented on FLINK-1914: Sounds good [~rmetzger]. Closing the JIRA. > Wrong FS while starting YARN session without correct HADOOP_HOME > > > Key: FLINK-1914 > URL: https://issues.apache.org/jira/browse/FLINK-1914 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Zoltán Zvara >Priority: Trivial > Labels: yarn, yarn-client > > When YARN session invoked ({{yarn-session.sh}}) without a correct > {{HADOOP_HOME}} (AM still deployed to - for example to {{0.0.0.0:8032}}), but > the deployed AM fails with an {{IllegalArgumentException}}: > {code} > java.lang.IllegalArgumentException: Wrong FS: > file:/home/.../flink-dist-0.9-SNAPSHOT.jar, expected: hdfs://localhost:9000 > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:181) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92) > at > org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106) > at > org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102) > at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:105) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:436) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:371) > at scala.util.Try$.apply(Try.scala:161) > at > org.apache.flink.yarn.ApplicationMasterActor$class.org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession(ApplicationMasterActor.scala:371) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:155) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) > {code} > IMO this {{IllegalArgumentException}} should get handled in > {{org.apache.flink.yarn.Utils.registerLocalResource}} or on an upper level to > provide a better error message. This needs to be looked up from YARN logs at > the moment, which is painful to a trivial mistake like missing > {{HADOOP_HOME}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-710) Aggregate transformation works only with FieldPositionKeys
[ https://issues.apache.org/jira/browse/FLINK-710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neelesh Srinivas Salian closed FLINK-710. - Resolution: Duplicate > Aggregate transformation works only with FieldPositionKeys > -- > > Key: FLINK-710 > URL: https://issues.apache.org/jira/browse/FLINK-710 > Project: Flink > Issue Type: Improvement >Reporter: GitHub Import >Assignee: Márton Balassi >Priority: Trivial > Labels: github-import > > In the new Java API, Aggregate transformations can only be applied to > DataSets that are grouped using field positions and to ungrouped DataSets. > DataSets grouped with KeySelector functions are not supported. > Since Aggregations only work on Tuple DataSets which can be grouped using the > more convenient field positions, this might be OK. > Or should we support KeySelector groupings as well for completeness? > Opinions? > Imported from GitHub > Url: https://github.com/stratosphere/stratosphere/issues/710 > Created by: [fhueske|https://github.com/fhueske] > Labels: enhancement, java api, question, user satisfaction, > Milestone: Release 0.6 (unplanned) > Created at: Tue Apr 22 14:42:52 CEST 2014 > State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-2144) Incremental count, average, and variance for windows
[ https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477990#comment-15477990 ] Daniel Blazevski edited comment on FLINK-2144 at 9/9/16 8:49 PM: - Hi, I am curious to know the status of this issue. I have worked on Flink-ML a bit, and recently contributed to Flink's documentation of its Window API. If possible, I would like to work on this. I looked into the code and saw how "sum" is implemented, and I assume average and variance should be implemented in similar ways (with 2 versions, one taking a string another an integer). Count, of course, is more simple in that it counts the number of data points (not two different inputs) [~aljoscha][~ggevay] was (Author: danielblazevski): Hi, I am curious to know the status of this issue. I have worked on Flink-ML a bit, and recently contributed to Flink's documentation of its Window API. If possible, I would like to work on this. I looked into the code and saw how "sum" is implemented, and I assume average and variance should be implemented in similar ways (with 2 versions, one taking a string another an integer). Count, of course, is more simple in that it counts the number of data points (not two different inputs) > Incremental count, average, and variance for windows > > > Key: FLINK-2144 > URL: https://issues.apache.org/jira/browse/FLINK-2144 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Gabor Gevay >Priority: Minor > Labels: statistics > > By count I mean the number of elements in the window. > These can be implemented very efficiently building on FLINK-2143: > Store: O(1) > Evict: O(1) > emitWindow: O(1) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2144) Incremental count, average, and variance for windows
[ https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477990#comment-15477990 ] Daniel Blazevski commented on FLINK-2144: - Hi, I am curious to know the status of this issue. I have worked on Flink-ML a bit, and recently contributed to Flink's documentation of its Window API. If possible, I would like to work on this. I looked into the code and saw how "sum" is implemented, and I assume average and variance should be implemented in similar ways (with 2 versions, one taking a string another an integer). Count, of course, is more simple in that it counts the number of data points (not two different inputs) > Incremental count, average, and variance for windows > > > Key: FLINK-2144 > URL: https://issues.apache.org/jira/browse/FLINK-2144 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Gabor Gevay >Priority: Minor > Labels: statistics > > By count I mean the number of elements in the window. > These can be implemented very efficiently building on FLINK-2143: > Store: O(1) > Evict: O(1) > emitWindow: O(1) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Chen updated FLINK-4520: Description: h1. flink-siddhi proposal h2. Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application. * http://wso2.com/products/complex-event-processor/ * https://github.com/wso2/siddhi h2. Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) h2. Test Cases * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java h2. Example {code} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream> output = cep .from("inputStream1").union("inputStream2") .sql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream" ) .returns("outputStream"); env.execute(); {code} was: h1. flink-siddhi proposal h2. Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application. * http://wso2.com/products/complex-event-processor/ * https://github.com/wso2/siddhi h2. Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) h2. Test Cases * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) h2. Example {code} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id",
[jira] [Updated] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Chen updated FLINK-4520: Description: h1. flink-siddhi proposal h2. Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application. * http://wso2.com/products/complex-event-processor/ * https://github.com/wso2/siddhi h2. Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) h2. Test Cases * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) h2. Example {code} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream> output = cep .from("inputStream1").union("inputStream2") .sql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream" ) .returns("outputStream"); env.execute(); {code} was: h1. Flink Siddhi CEP Integration Proposal h2. About Siddhi CEP Siddhi CEP is a lightweight, easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under Apache Software License v2.0. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. * http://wso2.com/products/complex-event-processor/ * https://github.com/wso2/siddhi h2. Proposal As known, siddhi is very lightweight and rich featured CEP library, supporting most traditional CEP cases like: * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables * Partitions * Scripting:Support JavaScript & Scala Scripts within Siddhi Queries * Query: SQL like query language The proposal is to * Embed siddhi CEP as an stream operator of Flink * Support native siddhi query, extensions to run inside Flink StreamExecutionEnvironment * Integrate state management. * Provide consistent DSL to integrate with Flink programing API. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi
[GitHub] flink pull request #2486: FLINK-4520 Integrate Siddhi as a light-weight Stre...
GitHub user haoch opened a pull request: https://github.com/apache/flink/pull/2486 FLINK-4520 Integrate Siddhi as a light-weight Streaming CEP Library Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed # Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. __It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.__ # Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) # Test Cases * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) # Example StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream> output = cep .from("inputStream1").union("inputStream2") .sql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream" ) .returns("outputStream"); env.execute(); You can merge this pull request into a Git repository by running: $ git pull https://github.com/haoch/flink FLINK-4520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2486.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2486 commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0 Author: Hao Chen Date: 2016-08-29T12:34:42Z Implement initial version of flink-siddhi stream operator commit d8b131d9204e9e359afae80087afe2aecab27eaf Author: Chen, Hao Date: 2016-08-30T14:33:59Z Implement SiddhiStream API and DSL commit 56c150fadc54d06c186223e43a6fd9ac74eee837 Author: Chen, Hao Date: 2016-08-30T18:31:18Z Reformat code
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477941#comment-15477941 ] ASF GitHub Bot commented on FLINK-4520: --- GitHub user haoch opened a pull request: https://github.com/apache/flink/pull/2486 FLINK-4520 Integrate Siddhi as a light-weight Streaming CEP Library Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed # Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. __It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.__ # Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) # Test Cases * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) # Example StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream> output = cep .from("inputStream1").union("inputStream2") .sql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream" ) .returns("outputStream"); env.execute(); You can merge this pull request into a Git repository by running: $ git pull https://github.com/haoch/flink FLINK-4520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2486.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2486 commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0 Author: Hao Chen Date: 2016-08-29T12:34:42Z Implement initial version of flink-siddhi stream operator commit d8b131d9204e9e359afae80087afe2aecab27eaf
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477764#comment-15477764 ] ASF GitHub Bot commented on FLINK-3921: --- Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2060 Thanks @greghogan for your review. Please have a look. thanks! > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2060: [FLINK-3921] StringParser encoding
Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2060 Thanks @greghogan for your review. Please have a look. thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477588#comment-15477588 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 Thanks, will try that. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 Thanks, will try that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477573#comment-15477573 ] ASF GitHub Bot commented on FLINK-3929: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 I would suggest to just you your own Travis account. It is free, you just need to connect it with your account. Then push to any branch in your GitHub account and Travis will run. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 I would suggest to just you your own Travis account. It is free, you just need to connect it with your account. Then push to any branch in your GitHub account and Travis will run. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477565#comment-15477565 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2463 I've rebased the pull request and incorporated your suggestions. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allocation ...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2463 I've rebased the pull request and incorporated your suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477560#comment-15477560 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r78212144 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- Thank you for your comments @beyond1920. Your observations are correct. I've skipped this part of the implementation and wanted to address it next. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r78212144 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- Thank you for your comments @beyond1920. Your observations are correct. I've skipped this part of the implementation and wanted to address it next. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477387#comment-15477387 ] ASF GitHub Bot commented on FLINK-3921: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78203358 --- Diff: flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java --- @@ -75,8 +76,30 @@ /** Invalid Boolean value **/ BOOLEAN_INVALID } - + + private Charset charset = Charset.forName("US-ASCII"); + private ParseErrorState errorState = ParseErrorState.NONE; + + /** +* Parses the value of a field from the byte array. +* The start position within the byte array and the array's valid length is given. +* The content of the value is delimited by a field delimiter. +* +* @param bytes The byte array that holds the value. +* @param startPos The index where the field starts +* @param limit The limit unto which the byte contents is valid for the parser. The limit is the +* position one after the last valid byte. +* @param delim The field delimiter character +* @param reuse An optional reusable field to hold the value +* @param charset The charset to parse with +* +* @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise. +*/ + public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse, Charset charset){ + this.charset = charset; --- End diff -- Is this method needed? `GenericCsvInputFormat.open` can `setCharset` on each newly instantiated `FieldParser`, and in the case where a user decided to change charset on an open file `GenericCsvInputFormat.setCharset` could go through and `setCharset` on the list of `FieldParser`s. > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78203358 --- Diff: flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java --- @@ -75,8 +76,30 @@ /** Invalid Boolean value **/ BOOLEAN_INVALID } - + + private Charset charset = Charset.forName("US-ASCII"); + private ParseErrorState errorState = ParseErrorState.NONE; + + /** +* Parses the value of a field from the byte array. +* The start position within the byte array and the array's valid length is given. +* The content of the value is delimited by a field delimiter. +* +* @param bytes The byte array that holds the value. +* @param startPos The index where the field starts +* @param limit The limit unto which the byte contents is valid for the parser. The limit is the +* position one after the last valid byte. +* @param delim The field delimiter character +* @param reuse An optional reusable field to hold the value +* @param charset The charset to parse with +* +* @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise. +*/ + public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse, Charset charset){ + this.charset = charset; --- End diff -- Is this method needed? `GenericCsvInputFormat.open` can `setCharset` on each newly instantiated `FieldParser`, and in the case where a user decided to change charset on an open file `GenericCsvInputFormat.setCharset` could go through and `setCharset` on the list of `FieldParser`s. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 hmmm.. it's getting complicated. will try to debug the issue. Do you know why on Travis it has to fail but not on Jenkins? How do I simulate this on Travis for my own testing? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477337#comment-15477337 ] ASF GitHub Bot commented on FLINK-3921: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199868 --- Diff: flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java --- @@ -75,8 +76,30 @@ /** Invalid Boolean value **/ BOOLEAN_INVALID } - + + private Charset charset = Charset.forName("US-ASCII"); --- End diff -- Should this also default to `UFT-8`? > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199868 --- Diff: flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java --- @@ -75,8 +76,30 @@ /** Invalid Boolean value **/ BOOLEAN_INVALID } - + + private Charset charset = Charset.forName("US-ASCII"); --- End diff -- Should this also default to `UFT-8`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477333#comment-15477333 ] ASF GitHub Bot commented on FLINK-3921: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199594 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java --- @@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } + /** +* Gets the Charset for the parser.Default is set to UTF-8 +* +* @return The charset for the parser. +*/ + public Charset getCharset() { + return this.charset; + } + + /** +* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset +* when doing a parse. +* +* @param charset The charset to set. +*/ + protected void setCharset(Charset charset){ --- End diff -- Add space between `)` and `{` (and I count two more instances). > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199594 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java --- @@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } + /** +* Gets the Charset for the parser.Default is set to UTF-8 +* +* @return The charset for the parser. +*/ + public Charset getCharset() { + return this.charset; + } + + /** +* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset +* when doing a parse. +* +* @param charset The charset to set. +*/ + protected void setCharset(Charset charset){ --- End diff -- Add space between `)` and `{` (and I count two more instances). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477321#comment-15477321 ] ASF GitHub Bot commented on FLINK-3921: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199216 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java --- @@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } + /** +* Gets the Charset for the parser.Default is set to UTF-8 --- End diff -- Should we spell out `charset` as "character set" in the documentation? Also, space after period and missing period at end. > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477325#comment-15477325 ] ASF GitHub Bot commented on FLINK-3921: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199365 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java --- @@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } + /** +* Gets the Charset for the parser.Default is set to UTF-8 +* +* @return The charset for the parser. +*/ + public Charset getCharset() { + return this.charset; + } + + /** +* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset +* when doing a parse. +* +* @param charset The charset to set. +*/ + protected void setCharset(Charset charset){ + this.charset = charset != null ? charset : Charset.forName("UTF-8"); --- End diff -- Use `Preconditions.checkNotNull`? > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199365 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java --- @@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } + /** +* Gets the Charset for the parser.Default is set to UTF-8 +* +* @return The charset for the parser. +*/ + public Charset getCharset() { + return this.charset; + } + + /** +* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset +* when doing a parse. +* +* @param charset The charset to set. +*/ + protected void setCharset(Charset charset){ + this.charset = charset != null ? charset : Charset.forName("UTF-8"); --- End diff -- Use `Preconditions.checkNotNull`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199262 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java --- @@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } + /** +* Gets the Charset for the parser.Default is set to UTF-8 +* +* @return The charset for the parser. +*/ + public Charset getCharset() { + return this.charset; + } + + /** +* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset +* when doing a parse. +* +* @param charset The charset to set. --- End diff -- Double space. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477323#comment-15477323 ] ASF GitHub Bot commented on FLINK-3921: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199262 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java --- @@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } + /** +* Gets the Charset for the parser.Default is set to UTF-8 +* +* @return The charset for the parser. +*/ + public Charset getCharset() { + return this.charset; + } + + /** +* Sets the charset of the parser. Called by subclasses of the parser to set the type of charset +* when doing a parse. +* +* @param charset The charset to set. --- End diff -- Double space. > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2060: [FLINK-3921] StringParser encoding
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2060#discussion_r78199216 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java --- @@ -314,6 +320,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } + /** +* Gets the Charset for the parser.Default is set to UTF-8 --- End diff -- Should we spell out `charset` as "character set" in the documentation? Also, space after period and missing period at end. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477315#comment-15477315 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 hmmm.. it's getting complicated. will try to debug the issue. Do you know why on Travis it has to fail but not on Jenkins? How do I simulate this on Travis for my own testing? > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2485: [Flink 4599] - Add 'explain()' also to StreamTableEnviron...
Github user chobeat commented on the issue: https://github.com/apache/flink/pull/2485 Done, sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2485: [Flink 4599] - Add 'explain()' also to StreamTableEnviron...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2485 Could you rebase your PR on the current master and then do a force push? There are a lot of commits that should not appear here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4605) Add an expression that returns the return type of an expression
Timo Walther created FLINK-4605: --- Summary: Add an expression that returns the return type of an expression Key: FLINK-4605 URL: https://issues.apache.org/jira/browse/FLINK-4605 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Esp. for Java users of the Table API it is hard to obtain the return type of an expression. I propose to implement an expression that returns the type of an input expression as a string. {{myLong.getType()}} could call the toString method of TypeInformation. This could also be useful to distinguish between different subtypes of POJOs etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477252#comment-15477252 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r78194763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- These are valid points, I will change the code to use the `LeaderRetrievalListener` instead. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r78194763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- These are valid points, I will change the code to use the `LeaderRetrievalListener` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477240#comment-15477240 ] Timo Walther commented on FLINK-4591: - Ok, Calcite does also not support {{SELECT * FROM WordCount GROUP BY word}}. I'm fine with closing this issue. > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > It would be consistent if this would also work: > {{table.groupBy( '* ).select( "* )}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4603) KeyedStateBackend cannot restore user code classes
[ https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477093#comment-15477093 ] Aljoscha Krettek commented on FLINK-4603: - We wouldn't if we always keep everything in serialized form, as is the case for the RocksDB backend. In that case, the user code is only necessary when a state access comes in and when that happens we're already "in" the user code class loader. > KeyedStateBackend cannot restore user code classes > -- > > Key: FLINK-4603 > URL: https://issues.apache.org/jira/browse/FLINK-4603 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Priority: Blocker > Fix For: 1.2.0 > > > A user reported that he cannot restore keyed state which contains user code > classes. I suspect that we don't use the user code class loader to > deserialize the state. > The solution seems to be to forward the user code class loader to the > {{KeyedStateBackends}} when restoring state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4561) replace all the scala version as a `scala.binary.version` property
[ https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477052#comment-15477052 ] ASF GitHub Bot commented on FLINK-4561: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2459 I would really like to not change the way it is done right now. It works and it is the result of many user interactions, testing across Maven, SBT, Gradle, etc. These various build systems have such subtle differences that I think we should stick with paradigm of "don't change it unless it is broken". > replace all the scala version as a `scala.binary.version` property > -- > > Key: FLINK-4561 > URL: https://issues.apache.org/jira/browse/FLINK-4561 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > > replace all the scala version(2.10) as a property `scala.binary.version` > defined in root pom properties. default scala version property is 2.10. > modify: > 1. dependency include scala version > 2. module defining include scala version > 3. scala version upgrade to 2.11.8 from 2.11.7 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2459: [FLINK-4561] replace all the scala version as a `scala.bi...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2459 I would really like to not change the way it is done right now. It works and it is the result of many user interactions, testing across Maven, SBT, Gradle, etc. These various build systems have such subtle differences that I think we should stick with paradigm of "don't change it unless it is broken". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477004#comment-15477004 ] Fabian Hueske commented on FLINK-4599: -- Definitely. Would be great if you could add this to the docs. Thanks! > Add 'explain()' also to StreamTableEnvironment > -- > > Key: FLINK-4599 > URL: https://issues.apache.org/jira/browse/FLINK-4599 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > Labels: starter > > Currenlty, only the BatchTableEnvironment supports the {{explain}} command > for tables. We should also support it for the StreamTableEnvironment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477000#comment-15477000 ] Simone Robutti commented on FLINK-4599: --- I'd rather go for the second choice. I've opened the PR. I've noticed that this call is not documented. Do you think it would be useful to add a few lines both for the batch and streaming documentation? > Add 'explain()' also to StreamTableEnvironment > -- > > Key: FLINK-4599 > URL: https://issues.apache.org/jira/browse/FLINK-4599 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > Labels: starter > > Currenlty, only the BatchTableEnvironment supports the {{explain}} command > for tables. We should also support it for the StreamTableEnvironment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
GitHub user chobeat opened a pull request: https://github.com/apache/flink/pull/2485 [Flink 4599] - Add 'explain()' also to StreamTableEnvironment You can merge this pull request into a Git repository by running: $ git pull https://github.com/radicalbit/flink FLINK-4599 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2485.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2485 commit 29f0204308a8f871e7b27767efc3020e362218f4 Author: chobeatDate: 2016-09-09T09:27:41Z AST DataStream table explain commit 593569405bbd2b728f5e9edc82242019a7fbe9be Author: chobeat Date: 2016-09-09T09:31:12Z Merge remote-tracking branch 'origin/master' into FLINK-4599 commit 7bf135237cb24ed889b92bf5d8026de43d1038fc Author: f7753 Date: 2016-08-22T13:07:55Z [FLINK-4436] Unclosed DataOutputBuffer in Utils#setTokensFor() This closes #2402 commit ba043aaa51401f53c2868927a540ebf7a3493318 Author: Greg Hogan Date: 2016-07-25T13:09:27Z [FLINK-4257] [gelly] Handle delegating algorithm change of class Replaces Delegate with NoOpOperator. This closes #2474 commit 850fd5fec5133f7729bc6a5b2af00cb2decc229b Author: Till Rohrmann Date: 2016-08-31T15:58:09Z [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFlinkMiniCluster Rename _configuration to originalConfiguration Remove testing classes from main scope in flink-runtime Previously, the ForkableFlinkMiniCluster which resided in flink-test-utils required these files to be in the main scope of flink-runtime. With the removal of the ForkableFlinkMiniCluster, these classes are now no longer needed and can be moved back to the test scope. This closes #2450. commit 7eecc4bd7aa605051cc3c6abc1a87233b8695127 Author: Till Rohrmann Date: 2016-09-01T12:41:44Z [FLINK-4456] Replace Akka specific types by interfaces in Task Introduce TaskExecutionStateListener for Task Replace JobManagerGateway in Task by InputSplitProvider and CheckpointNotifier Replace the TaskManager ActorGateway by TaskManagerConnection in Task Rename taskmanager.CheckpointNotifier into CheckpointResponder; rename TaskExecutionStateListener.notifyTaskExecutionState into notifyTaskExecutionStateChanged Remove InputSplitProvider.start; add ClassLoader parameter to InputSplitProvider.getNextInputSplit Removes the unused class InputSplitIterator. Update InputSplitProvider JavaDocs This closes #2456. commit 45f842eb8ea7da4f0535cd29c345fd45fe3d3815 Author: Greg Hogan Date: 2016-09-02T14:42:30Z [FLINK-4522] [docs] Gelly link broken in homepage The Gelly documentation was recently split into multiple pages in FLINK-4104 but was missing a redirect. This commit updates the Gelly redirect to point to the old page. This closes #2464 commit 7c9d1679c8319d560c9032691ad05b723b852f66 Author: Greg Hogan Date: 2016-09-02T15:53:08Z [FLINK-4571] [gelly] Configurable little parallelism in Gelly drivers This closes #2475 commit 2ddb4fef89debd7cef740058cc79ddca097879e1 Author: Alexander Pivovarov Date: 2016-09-07T21:11:06Z [FLINK-4595] Close FileOutputStream in ParameterTool This closes #2478 commit 2d4c75e688cd143022fc1db2b209c71935003c7d Author: chobeat Date: 2016-09-09T09:32:13Z Merge branch 'FLINK-4599' of github.com:radicalbit/flink into FLINK-4599 commit f3ba22ba5d2901c25cdbd11689a1ca3cc50935cd Author: chobeat Date: 2016-09-09T12:30:50Z moved stream explain test resources to main folder --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4603) KeyedStateBackend cannot restore user code classes
[ https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476941#comment-15476941 ] Till Rohrmann commented on FLINK-4603: -- Wouldn't we even then need the user code class loader to load user code classes? > KeyedStateBackend cannot restore user code classes > -- > > Key: FLINK-4603 > URL: https://issues.apache.org/jira/browse/FLINK-4603 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Priority: Blocker > Fix For: 1.2.0 > > > A user reported that he cannot restore keyed state which contains user code > classes. I suspect that we don't use the user code class loader to > deserialize the state. > The solution seems to be to forward the user code class loader to the > {{KeyedStateBackends}} when restoring state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4603) KeyedStateBackend cannot restore user code classes
[ https://issues.apache.org/jira/browse/FLINK-4603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476881#comment-15476881 ] Aljoscha Krettek commented on FLINK-4603: - Yep, the user code {{ClassLoder}} can be retrieved from the {{Environment}} passed to {{AbstractStateBackend.restoreKeyedStateBackend()}}. A nicer solution would be to completely get rid of Java Serialization in the state serialization. > KeyedStateBackend cannot restore user code classes > -- > > Key: FLINK-4603 > URL: https://issues.apache.org/jira/browse/FLINK-4603 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Priority: Blocker > Fix For: 1.2.0 > > > A user reported that he cannot restore keyed state which contains user code > classes. I suspect that we don't use the user code class loader to > deserialize the state. > The solution seems to be to forward the user code class loader to the > {{KeyedStateBackends}} when restoring state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4604) Add support for standard deviation/variance
Timo Walther created FLINK-4604: --- Summary: Add support for standard deviation/variance Key: FLINK-4604 URL: https://issues.apache.org/jira/browse/FLINK-4604 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test and document this rule. If we also want to add this aggregates to Table API is up for discussion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4592) Fix flaky test ScalarFunctionsTest.testCurrentTimePoint
[ https://issues.apache.org/jira/browse/FLINK-4592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-4592. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed in 8c0d62433a3f57d0753edb00e5c2bbc1adc467df. > Fix flaky test ScalarFunctionsTest.testCurrentTimePoint > --- > > Key: FLINK-4592 > URL: https://issues.apache.org/jira/browse/FLINK-4592 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Labels: starter > Fix For: 1.2.0 > > > It seems that the test is still non deterministic. > {code} > org.apache.flink.api.table.expressions.ScalarFunctionsTest > testCurrentTimePoint(org.apache.flink.api.table.expressions.ScalarFunctionsTest) > Time elapsed: 0.083 sec <<< FAILURE! > org.junit.ComparisonFailure: Wrong result for: > AS(>=(CHAR_LENGTH(CAST(CURRENT_TIMESTAMP):VARCHAR(1) CHARACTER SET > "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 22), '_c0') > expected:<[tru]e> but was:<[fals]e> > at org.junit.Assert.assertEquals(Assert.java:115) > at > org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:126) > at > org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:123) > at > scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:87) > at > org.apache.flink.api.table.expressions.utils.ExpressionTestBase.evaluateExprs(ExpressionTestBase.scala:123) > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-4505: - Description: Implement {{TaskManagerRunner}} to construct related components ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or standalone mode. (was: Implement {{TaskExecutorFactory}} that should be an abstract class with the helper methods to bring up the {{TaskManager}}. The factory can be implemented by some classes to start a {{TaskManager}} in different modes (testing, standalone, yarn).) > Implement TaskManagerRunner to construct related components for TaskManager > --- > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang >Priority: Minor > > Implement {{TaskManagerRunner}} to construct related components > ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, > {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or > standalone mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-4505: - Summary: Implement TaskManagerRunner to construct related components for TaskManager (was: Implement TaskManagerFactory to bring up TaskManager for different modes) > Implement TaskManagerRunner to construct related components for TaskManager > --- > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang >Priority: Minor > > Implement {{TaskExecutorFactory}} that should be an abstract class with the > helper methods to bring up the {{TaskManager}}. The factory can be > implemented by some classes to start a {{TaskManager}} in different modes > (testing, standalone, yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4177) CassandraConnectorTest.testCassandraCommitter causing unstable builds
[ https://issues.apache.org/jira/browse/FLINK-4177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476803#comment-15476803 ] ASF GitHub Bot commented on FLINK-4177: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/2484 [FLINK-4177] Harden CassandraConnectorTest This PR (hopefully) resolves the instability issues with the Cassandra connector tests. Changelog: * updated cassandra/driver versions * the `cassandra.yaml` was cleaned up * removed several configuration values that used the default * sorted the remaining settings in alphabetical order * the at-least-once sinks were modified to * properly log exceptions when close() is called * keep track of how many records were not acknowledged yet * the tests were modified to * start the embedded cassandra instance in a separate process * and supply an array of performance related jvm arguments, taken from the cassandra repo * no longer truncate tables; instead every test uses a separate table * wait until a connection could be established to cassandra in a retry-loop instead of waiting for a fixed time * no longer run actual flink jobs * use increased timeouts * clean up temporary files You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink cass_tmp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2484.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2484 commit 64b125343839788530729a2119d1dc92e50e849a Author: zentolDate: 2016-09-05T09:03:00Z [FLINK-4177] Harden CassandraConnectorTest > CassandraConnectorTest.testCassandraCommitter causing unstable builds > - > > Key: FLINK-4177 > URL: https://issues.apache.org/jira/browse/FLINK-4177 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Robert Metzger > Labels: test-stability > > This build: https://api.travis-ci.org/jobs/143272982/log.txt?deansi=true > failed with > {code} > 07/08/2016 09:59:12 Job execution switched to status FINISHED. > Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 146.646 sec > <<< FAILURE! - in > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest > testCassandraCommitter(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest) > Time elapsed: 9.057 sec <<< ERROR! > com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout > during write query at consistency LOCAL_SERIAL (1 replica were required but > only 0 acknowledged the write) > at > com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:73) > at > com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:26) > at > com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) > at > com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) > at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63) > at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) > at > org.apache.flink.streaming.connectors.cassandra.CassandraCommitter.open(CassandraCommitter.java:103) > at > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest.testCassandraCommitter(CassandraConnectorTest.java:284) > Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: > Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica > were required but only 0 acknowledged the write) > at > com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:100) > at > com.datastax.driver.core.Responses$Error.asException(Responses.java:122) > at > com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477) > at > com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005) > at > com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304) > at >
[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/2484 [FLINK-4177] Harden CassandraConnectorTest This PR (hopefully) resolves the instability issues with the Cassandra connector tests. Changelog: * updated cassandra/driver versions * the `cassandra.yaml` was cleaned up * removed several configuration values that used the default * sorted the remaining settings in alphabetical order * the at-least-once sinks were modified to * properly log exceptions when close() is called * keep track of how many records were not acknowledged yet * the tests were modified to * start the embedded cassandra instance in a separate process * and supply an array of performance related jvm arguments, taken from the cassandra repo * no longer truncate tables; instead every test uses a separate table * wait until a connection could be established to cassandra in a retry-loop instead of waiting for a fixed time * no longer run actual flink jobs * use increased timeouts * clean up temporary files You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink cass_tmp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2484.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2484 commit 64b125343839788530729a2119d1dc92e50e849a Author: zentolDate: 2016-09-05T09:03:00Z [FLINK-4177] Harden CassandraConnectorTest --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4592) Fix flaky test ScalarFunctionsTest.testCurrentTimePoint
[ https://issues.apache.org/jira/browse/FLINK-4592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-4592: --- Assignee: Timo Walther > Fix flaky test ScalarFunctionsTest.testCurrentTimePoint > --- > > Key: FLINK-4592 > URL: https://issues.apache.org/jira/browse/FLINK-4592 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Labels: starter > > It seems that the test is still non deterministic. > {code} > org.apache.flink.api.table.expressions.ScalarFunctionsTest > testCurrentTimePoint(org.apache.flink.api.table.expressions.ScalarFunctionsTest) > Time elapsed: 0.083 sec <<< FAILURE! > org.junit.ComparisonFailure: Wrong result for: > AS(>=(CHAR_LENGTH(CAST(CURRENT_TIMESTAMP):VARCHAR(1) CHARACTER SET > "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 22), '_c0') > expected:<[tru]e> but was:<[fals]e> > at org.junit.Assert.assertEquals(Assert.java:115) > at > org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:126) > at > org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:123) > at > scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:87) > at > org.apache.flink.api.table.expressions.utils.ExpressionTestBase.evaluateExprs(ExpressionTestBase.scala:123) > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2483: [FLINK-4601] Check for empty string properly
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2483 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4601) Check for empty string properly
[ https://issues.apache.org/jira/browse/FLINK-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476725#comment-15476725 ] ASF GitHub Bot commented on FLINK-4601: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2483 > Check for empty string properly > --- > > Key: FLINK-4601 > URL: https://issues.apache.org/jira/browse/FLINK-4601 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > Fix For: 1.2.0 > > > UdfAnalyzerExamplesTest.java and UdfAnalyzerTest.java use == to check for > empty string. We should use isEmpty() instead -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4601) Check for empty string properly
[ https://issues.apache.org/jira/browse/FLINK-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-4601. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed in e92f91aeba3c9e8da5a5ff9efda342cb19da928c. > Check for empty string properly > --- > > Key: FLINK-4601 > URL: https://issues.apache.org/jira/browse/FLINK-4601 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > Fix For: 1.2.0 > > > UdfAnalyzerExamplesTest.java and UdfAnalyzerTest.java use == to check for > empty string. We should use isEmpty() instead -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4601) Check for empty string properly
[ https://issues.apache.org/jira/browse/FLINK-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476719#comment-15476719 ] ASF GitHub Bot commented on FLINK-4601: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2483 Thanks for the fix. Will merge... > Check for empty string properly > --- > > Key: FLINK-4601 > URL: https://issues.apache.org/jira/browse/FLINK-4601 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > > UdfAnalyzerExamplesTest.java and UdfAnalyzerTest.java use == to check for > empty string. We should use isEmpty() instead -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2483: [FLINK-4601] Check for empty string properly
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2483 Thanks for the fix. Will merge... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4603) KeyedStateBackend cannot restore user code classes
Till Rohrmann created FLINK-4603: Summary: KeyedStateBackend cannot restore user code classes Key: FLINK-4603 URL: https://issues.apache.org/jira/browse/FLINK-4603 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Till Rohrmann Priority: Blocker Fix For: 1.2.0 A user reported that he cannot restore keyed state which contains user code classes. I suspect that we don't use the user code class loader to deserialize the state. The solution seems to be to forward the user code class loader to the {{KeyedStateBackends}} when restoring state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476495#comment-15476495 ] Fabian Hueske commented on FLINK-4599: -- I would leave that up to you. If you come up with a good implementation to obtain the physical plan, that would be great. But we can also start with the Calcite AST and do the physical plan in a separate issue. Whatever you prefer :-) > Add 'explain()' also to StreamTableEnvironment > -- > > Key: FLINK-4599 > URL: https://issues.apache.org/jira/browse/FLINK-4599 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > Labels: starter > > Currenlty, only the BatchTableEnvironment supports the {{explain}} command > for tables. We should also support it for the StreamTableEnvironment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15476482#comment-15476482 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78154021 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; + +public final class PojoComparatorGenerator { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private transient Field[] keyFields; + private transient Integer[] keyFieldIds; + private final TypeComparator[] comparators; + private final TypeSerializer serializer; + private final Class type; + private final ExecutionConfig config; + private String code; + + public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] comparators, TypeSerializer serializer, + Class type, Integer[] keyFieldIds, ExecutionConfig config) { + this.keyFields = keyFields; + this.comparators = comparators; + + this.type = type; + this.serializer = serializer; + this.keyFieldIds = keyFieldIds; + this.config = config; + } + + public TypeComparator createComparator() { + // Multiple comparators can be generated for each type based on a list of keys. The list of keys and the type + // name should determine the generated comparator. This information is used for caching (avoiding + // recompilation). Note that, the name of the field is not sufficient because nested POJOs might have a field + // with the name. + StringBuilder keyBuilder = new StringBuilder(); + for(Integer i : keyFieldIds) { + keyBuilder.append(i); + keyBuilder.append("_"); + } + final String className = type.getCanonicalName().replace('.', '_') + "_GeneratedComparator" + + keyBuilder.toString(); + final String fullClassName = packageName + "." + className; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + return new GenTypeComparatorProxy<>(type, fullClassName, code, comparators, serializer); + } + + + private void generateCode(String className) { + String typeName = type.getCanonicalName(); + StringBuilder members = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + members.append(String.format("final TypeComparator f%d;\n", i)); --- End diff -- same specialization here as in `PojoSerializerGenerator` > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016,
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78154021 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; + +public final class PojoComparatorGenerator { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private transient Field[] keyFields; + private transient Integer[] keyFieldIds; + private final TypeComparator[] comparators; + private final TypeSerializer serializer; + private final Class type; + private final ExecutionConfig config; + private String code; + + public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] comparators, TypeSerializer serializer, + Class type, Integer[] keyFieldIds, ExecutionConfig config) { + this.keyFields = keyFields; + this.comparators = comparators; + + this.type = type; + this.serializer = serializer; + this.keyFieldIds = keyFieldIds; + this.config = config; + } + + public TypeComparator createComparator() { + // Multiple comparators can be generated for each type based on a list of keys. The list of keys and the type + // name should determine the generated comparator. This information is used for caching (avoiding + // recompilation). Note that, the name of the field is not sufficient because nested POJOs might have a field + // with the name. + StringBuilder keyBuilder = new StringBuilder(); + for(Integer i : keyFieldIds) { + keyBuilder.append(i); + keyBuilder.append("_"); + } + final String className = type.getCanonicalName().replace('.', '_') + "_GeneratedComparator" + + keyBuilder.toString(); + final String fullClassName = packageName + "." + className; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + return new GenTypeComparatorProxy<>(type, fullClassName, code, comparators, serializer); + } + + + private void generateCode(String className) { + String typeName = type.getCanonicalName(); + StringBuilder members = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + members.append(String.format("final TypeComparator f%d;\n", i)); --- End diff -- same specialization here as in `PojoSerializerGenerator` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---