[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333232#comment-15333232 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67294752 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- ` C:\flink\flink\flink-scala\src\test\scala\org\apache\flink\api\operator\MaxByOperatorTest.scala:27: error: wrong number of type parameters for method apply: [A](xs: A*)List[A] in object List [ERROR] private val emptyTupleData = List[Int, Long, String, Long, Int]() [ERROR]^ [ERROR] C:\flink\flink\flink-scala\src\test\scala\org\apache\flink\api\operator\MinByOperatorTest.scala:27: error: wrong number of type parameters for method apply: [A](xs: A*)List[A] in object List [ERROR] private val emptyTupleData = List[Int, Long, String, Long, Int]() ` Even this fails. Let me read on scala to see the syntax here. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67294752 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- ` C:\flink\flink\flink-scala\src\test\scala\org\apache\flink\api\operator\MaxByOperatorTest.scala:27: error: wrong number of type parameters for method apply: [A](xs: A*)List[A] in object List [ERROR] private val emptyTupleData = List[Int, Long, String, Long, Int]() [ERROR]^ [ERROR] C:\flink\flink\flink-scala\src\test\scala\org\apache\flink\api\operator\MinByOperatorTest.scala:27: error: wrong number of type parameters for method apply: [A](xs: A*)List[A] in object List [ERROR] private val emptyTupleData = List[Int, Long, String, Long, Int]() ` Even this fails. Let me read on scala to see the syntax here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers
[ https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333196#comment-15333196 ] ASF GitHub Bot commented on FLINK-3758: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/1979 @knaufk Could you please close this now that the Jira issue is also closed? > Add possibility to register accumulators in custom triggers > --- > > Key: FLINK-3758 > URL: https://issues.apache.org/jira/browse/FLINK-3758 > Project: Flink > Issue Type: Improvement >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Minor > > For monitoring purposes it would be nice to be able to to use accumulators in > custom trigger functions. > Basically, the trigger context could just expose {{getAccumulator}} of > {{RuntimeContext}} or does this create problems I am not aware of? > Adding accumulators in a trigger function is more difficult, I think, but > that's not really neccessary as the accummulator could just be added in some > other upstream operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1979: [FLINK-3758] Add possibility to register accumulators in ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/1979 @knaufk Could you please close this now that the Jira issue is also closed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.
[ https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4068: -- Assignee: Jark Wu > Move constant computations out of code-generated `flatMap` functions. > - > > Key: FLINK-4068 > URL: https://issues.apache.org/jira/browse/FLINK-4068 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > The generated functions for expressions of the Table API or SQL include > constant computations. > For instance the code generated for a predicate like: > {code} > myInt < (10 + 20) > {code} > looks roughly like: > {code} > public void flatMap(Row in, Collector out) { > Integer in1 = in.productElement(1); > int temp = 10 + 20; > if (in1 < temp) { > out.collect(in) > } > } > {code} > In this example the computation of {{temp}} is constant and could be moved > out of the {{flatMap()}} method. > The same might apply for generated function other than {{FlatMap}} as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333183#comment-15333183 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67291576 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- I think this is what you meant `List[Int, Long, String, Long, Int]()` - removing that explicit scala.Tuple5. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67291576 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- I think this is what you meant `List[Int, Long, String, Long, Int]()` - removing that explicit scala.Tuple5. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67291510 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- Should this be `List[(Int, Long, String, Long, Int)]` or `List(Int, Long, String, Long, Int)`. Because if is the former `val collection = env.fromCollection(emptyTupleData)` throws compilation error and if it is the latter the build fails `[ERROR] /home/travis/build/apache/flink/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala:27: error: object java.lang.String is not a value [ERROR] private val emptyTupleData = List(Int, Long, String, Long, Int)` This way 'List[scala.Tuple5[Int, Long, String, Long, Int]]()' was not throwing any exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333182#comment-15333182 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67291510 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- Should this be `List[(Int, Long, String, Long, Int)]` or `List(Int, Long, String, Long, Int)`. Because if is the former `val collection = env.fromCollection(emptyTupleData)` throws compilation error and if it is the latter the build fails `[ERROR] /home/travis/build/apache/flink/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala:27: error: object java.lang.String is not a value [ERROR] private val emptyTupleData = List(Int, Long, String, Long, Int)` This way 'List[scala.Tuple5[Int, Long, String, Long, Int]]()' was not throwing any exception. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3758) Add possibility to register accumulators in custom triggers
[ https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Knauf closed FLINK-3758. --- Resolution: Resolved > Add possibility to register accumulators in custom triggers > --- > > Key: FLINK-3758 > URL: https://issues.apache.org/jira/browse/FLINK-3758 > Project: Flink > Issue Type: Improvement >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Minor > > For monitoring purposes it would be nice to be able to to use accumulators in > custom trigger functions. > Basically, the trigger context could just expose {{getAccumulator}} of > {{RuntimeContext}} or does this create problems I am not aware of? > Adding accumulators in a trigger function is more difficult, I think, but > that's not really neccessary as the accummulator could just be added in some > other upstream operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers
[ https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333144#comment-15333144 ] Konstantin Knauf commented on FLINK-3758: - Sure. I see you alreaday open a PR for it. Great. > Add possibility to register accumulators in custom triggers > --- > > Key: FLINK-3758 > URL: https://issues.apache.org/jira/browse/FLINK-3758 > Project: Flink > Issue Type: Improvement >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Minor > > For monitoring purposes it would be nice to be able to to use accumulators in > custom trigger functions. > Basically, the trigger context could just expose {{getAccumulator}} of > {{RuntimeContext}} or does this create problems I am not aware of? > Adding accumulators in a trigger function is more difficult, I think, but > that's not really neccessary as the accummulator could just be added in some > other upstream operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3734: -- Description: {code} DataInputView in = inputState.getState(getUserCodeClassloader()); final long nextEvaluationTime = in.readLong(); final long nextSlideTime = in.readLong(); AbstractKeyedTimePanes panes = createPanes(keySelector, function); panes.readFromInput(in, keySerializer, stateTypeSerializer); restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); } {code} DataInputView in is not closed upon return. was: {code} DataInputView in = inputState.getState(getUserCodeClassloader()); final long nextEvaluationTime = in.readLong(); final long nextSlideTime = in.readLong(); AbstractKeyedTimePanes panes = createPanes(keySelector, function); panes.readFromInput(in, keySerializer, stateTypeSerializer); restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); } {code} DataInputView in is not closed upon return. > Unclosed DataInputView in > AbstractAlignedProcessingTimeWindowOperator#restoreState() > > > Key: FLINK-3734 > URL: https://issues.apache.org/jira/browse/FLINK-3734 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > DataInputView in = inputState.getState(getUserCodeClassloader()); > final long nextEvaluationTime = in.readLong(); > final long nextSlideTime = in.readLong(); > AbstractKeyedTimePanes panes = > createPanes(keySelector, function); > panes.readFromInput(in, keySerializer, stateTypeSerializer); > restoredState = new RestoredState<>(panes, nextEvaluationTime, > nextSlideTime); > } > {code} > DataInputView in is not closed upon return. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3753) KillerWatchDog should not use kill on toKill thread
[ https://issues.apache.org/jira/browse/FLINK-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3753: -- Description: {code} // this is harsh, but this watchdog is a last resort if (toKill.isAlive()) { toKill.stop(); } {code} stop() is deprecated. See: https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads was: {code} // this is harsh, but this watchdog is a last resort if (toKill.isAlive()) { toKill.stop(); } {code} stop() is deprecated. See: https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads > KillerWatchDog should not use kill on toKill thread > --- > > Key: FLINK-3753 > URL: https://issues.apache.org/jira/browse/FLINK-3753 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > // this is harsh, but this watchdog is a last resort > if (toKill.isAlive()) { > toKill.stop(); > } > {code} > stop() is deprecated. > See: > https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4053) Return value from Connection should be checked against null
[ https://issues.apache.org/jira/browse/FLINK-4053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4053: -- Description: In RMQSource.java and RMQSink.java, there is code in the following pattern: {code} connection = factory.newConnection(); channel = connection.createChannel(); {code} According to https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel() : {code} Returns: a new channel descriptor, or null if none is available {code} The return value should be checked against null. was: In RMQSource.java and RMQSink.java, there is code in the following pattern: {code} connection = factory.newConnection(); channel = connection.createChannel(); {code} According to https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel() : {code} Returns: a new channel descriptor, or null if none is available {code} The return value should be checked against null. > Return value from Connection should be checked against null > --- > > Key: FLINK-4053 > URL: https://issues.apache.org/jira/browse/FLINK-4053 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > In RMQSource.java and RMQSink.java, there is code in the following pattern: > {code} > connection = factory.newConnection(); > channel = connection.createChannel(); > {code} > According to > https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel() > : > {code} > Returns: > a new channel descriptor, or null if none is available > {code} > The return value should be checked against null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2102: [FLINK-4068] [tableAPI] Move constant computations...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2102#discussion_r67276795 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala --- @@ -146,4 +148,21 @@ class SelectITCase( tEnv.sql(sqlQuery) } + @Test + def testConstantReduce(): Unit = { --- End diff -- ð It's a good idea. I will try it later. And the CI throws `cannot translate call AS...` error, I will figure it out today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257863 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1)); +// assertEquals(new Double(2.0), result.productElement(2)); +// +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("a test", result.productElement
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332755#comment-15332755 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257812 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); --- End diff -- Enable after FLINK-3908 was fixed. > Create a RowCsvInputFormat to use as default CSV IF in Table API > > >
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332766#comment-15332766 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/1989 Hi @fpompermaier, thanks for the PR. The `flink-table` module is almost completely implemented in Scala. We do not have good experience with mixed Scala / Java modules and I would like to ask if you could port the Java classes to Scala. I haven't had a detailed look at the test yet. > Create a RowCsvInputFormat to use as default CSV IF in Table API > > > Key: FLINK-3901 > URL: https://issues.apache.org/jira/browse/FLINK-3901 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier >Priority: Minor > Labels: csv, null-values, row, tuple > > At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that > has the big limitation of 25 fields and null handling. > A new IF producing Row object is indeed necessary to avoid those limitations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/1989 Hi @fpompermaier, thanks for the PR. The `flink-table` module is almost completely implemented in Scala. We do not have good experience with mixed Scala / Java modules and I would like to ask if you could port the Java classes to Scala. I haven't had a detailed look at the test yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332758#comment-15332758 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257863 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1)); +//
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257728 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala --- @@ -25,6 +25,8 @@ import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, TupleTypeInfo} import org.apache.flink.api.java.{ExecutionEnvironment, DataSet} import org.apache.flink.api.table.Row import org.apache.flink.core.fs.Path +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.io.RowCsvInputFormat /** * A [[TableSource]] for simple CSV files with up to 25 fields. --- End diff -- Remove 25 field limitation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257812 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); --- End diff -- Enable after FLINK-3908 was fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332752#comment-15332752 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257728 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala --- @@ -25,6 +25,8 @@ import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, TupleTypeInfo} import org.apache.flink.api.java.{ExecutionEnvironment, DataSet} import org.apache.flink.api.table.Row import org.apache.flink.core.fs.Path +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.io.RowCsvInputFormat /** * A [[TableSource]] for simple CSV files with up to 25 fields. --- End diff -- Remove 25 field limitation > Create a RowCsvInputFormat to use as default CSV IF in Table API > > > Key: FLINK-3901 > URL: https://issues.apache.org/jira/browse/FLINK-3901 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier >Priority: Minor > Labels: csv, null-values, row, tuple > > At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that > has the big limitation of 25 fields and null handling. > A new IF producing Row object is indeed necessary to avoid those limitations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332750#comment-15332750 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257606 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { +
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257606 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { + if (reuse == null) { + reuse = new Row(rowSerializer.getLength()); + } + for (int i = 0; i < parsedValues.length; i++) { + reuse.setField(i, parsedValues[i]); + }
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257499 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { + if (reuse == null) { + reuse = new Row(rowSerializer.getLength()); + } + for (int i = 0; i < parsedValues.length; i++) { + reuse.setField(i, parsedValues[i]); + }
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332743#comment-15332743 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67257499 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { +
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332724#comment-15332724 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67256445 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { +
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67256445 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { + if (reuse == null) { + reuse = new Row(rowSerializer.getLength()); + } + for (int i = 0; i < parsedValues.length; i++) { + reuse.setField(i, parsedValues[i]); + }
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67254028 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); --- End diff -- We do not need the `RowSerializer`. It is sufficient to remember the number of `Row` fields (`rowTypeInfo.getArity()`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332680#comment-15332680 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67254028 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); --- End diff -- We do not need the `RowSerializer`. It is sufficient to remember the number of `Row` fields (`rowTypeInfo.getArity()`) > Create a RowCsvInputFormat to use as default CSV IF in Table API > > > Key: FLINK-3901 > URL: https://issues.apache.org/jira/browse/FLINK-3901 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.2 >Reporter: Flavio Pompermai
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67252744 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); --- End diff -- let all constructors (transitively) call this constructor and move the functionality of `configure` into this constructor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332671#comment-15332671 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67252744 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); --- End diff -- let all constructors (transitively) call this constructor and move the functionality of `configure` into this constructor. > Create a RowCsvInputFormat to use as default CSV IF in Table API > > > Key: FLINK-3901 > URL: https://issues.apache.org/jira/browse/FLINK-3901 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier >Priority: Minor > Labels: csv, null-values, row, tuple > > At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that > has the big limitation of 25 fields and null handling. > A new IF producing Row object is indeed necessary to avoid those limitations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user eliaslevy commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67248839 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +314,41 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { + // the logic is disabled + return; + } + pendingRecords--; + } + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + if(flushOnCheckpoint) { + // flushing is activated: We need to wait until pendingRecords is 0 + while(pendingRecords > 0) { + try { + Thread.sleep(10); --- End diff -- Any reason to sleep instead of calling producer.flush() to wait for the acks? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332616#comment-15332616 ] ASF GitHub Bot commented on FLINK-4027: --- Github user eliaslevy commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67248839 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +314,41 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { + // the logic is disabled + return; + } + pendingRecords--; + } + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + if(flushOnCheckpoint) { + // flushing is activated: We need to wait until pendingRecords is 0 + while(pendingRecords > 0) { + try { + Thread.sleep(10); --- End diff -- Any reason to sleep instead of calling producer.flush() to wait for the acks? > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332598#comment-15332598 ] ASF GitHub Bot commented on FLINK-4027: --- Github user eliaslevy commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67247830 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -51,10 +54,11 @@ * Flink Sink to produce data into a Kafka topic. * * Please note that this producer does not have any reliability guarantees. + * The producer implements the checkpointed interface for allowing synchronization on checkpoints. --- End diff -- May want to change: > note that this producer does not have any reliability guarantees. to > note that this producer provides at-least-once reliability guarantees when checkpoints are enabled. > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user eliaslevy commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67247830 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -51,10 +54,11 @@ * Flink Sink to produce data into a Kafka topic. * * Please note that this producer does not have any reliability guarantees. + * The producer implements the checkpointed interface for allowing synchronization on checkpoints. --- End diff -- May want to change: > note that this producer does not have any reliability guarantees. to > note that this producer provides at-least-once reliability guarantees when checkpoints are enabled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle
[ https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332547#comment-15332547 ] ASF GitHub Bot commented on FLINK-4024: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2100 Thanks @fhueske ! > FileSourceFunction not adjusted to new IF lifecycle > --- > > Key: FLINK-4024 > URL: https://issues.apache.org/jira/browse/FLINK-4024 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Critical > Fix For: 1.1.0 > > > The InputFormat lifecycle was extended in > ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional > open-/closeInputFormat() methods. > The streaming FileSourceFunction was not adjusted for this change, and thus > will fail for every InputFormat that leverages these new methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2100 Thanks @fhueske ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle
[ https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332544#comment-15332544 ] ASF GitHub Bot commented on FLINK-4024: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2100 Merging > FileSourceFunction not adjusted to new IF lifecycle > --- > > Key: FLINK-4024 > URL: https://issues.apache.org/jira/browse/FLINK-4024 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Critical > Fix For: 1.1.0 > > > The InputFormat lifecycle was extended in > ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional > open-/closeInputFormat() methods. > The streaming FileSourceFunction was not adjusted for this change, and thus > will fail for every InputFormat that leverages these new methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE
[ https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332545#comment-15332545 ] ASF GitHub Bot commented on FLINK-3908: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2007 Merging > FieldParsers error state is not reset correctly to NONE > --- > > Key: FLINK-3908 > URL: https://issues.apache.org/jira/browse/FLINK-3908 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: parser > > If during the parse of a csv there's a parse error (for example when in a > integer column there are non-int values) the errorState is not reset > correctly in the next parseField call. A simple fix would be to add as a > first statement of the {{parseField()}} function a call to > {{setErrorState(ParseErrorState.NONE)}} but it is something that should be > handled better (by default) for every subclass of {{FieldParser}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2007: [FLINK-3908] Fixed Parser's error state reset
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2007 Merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2100: [FLINK-4024] FileSourceFunction not adjusted to new IF li...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2100 Merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332538#comment-15332538 ] ASF GitHub Bot commented on FLINK-3650: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/1856 I think the easiest way is to fork of a branch from the current master and cherry-pick all your commits one after the other onto that branch (except for the merge commit of course). Then you squash all commits and force push into the PR branch to update the PR. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/1856 I think the easiest way is to fork of a branch from the current master and cherry-pick all your commits one after the other onto that branch (except for the merge commit of course). Then you squash all commits and force push into the PR branch to update the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE
[ https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332516#comment-15332516 ] ASF GitHub Bot commented on FLINK-3908: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2007 Good to merge. > FieldParsers error state is not reset correctly to NONE > --- > > Key: FLINK-3908 > URL: https://issues.apache.org/jira/browse/FLINK-3908 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: parser > > If during the parse of a csv there's a parse error (for example when in a > integer column there are non-int values) the errorState is not reset > correctly in the next parseField call. A simple fix would be to add as a > first statement of the {{parseField()}} function a call to > {{setErrorState(ParseErrorState.NONE)}} but it is something that should be > handled better (by default) for every subclass of {{FieldParser}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2007: [FLINK-3908] Fixed Parser's error state reset
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2007 Good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-4027: - Assignee: Robert Metzger > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332450#comment-15332450 ] ASF GitHub Bot commented on FLINK-4027: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2108 [FLINK-4027] Flush FlinkKafkaProducer on checkpoints A user on the mailing list raised the point that our Kafka producer can be made at-least-once quite easily. The current producer code doesn't have any guarantees We are using the producer's callbacks to account for unacknowledged records. When a checkpoint barrier reaches the sink, it will confirm the checkpoint once all pending records have been acked. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink4027 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2108.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2108 commit d657ca8be1420a3e73c48bbbf65788fbd0b75c2c Author: Robert Metzger Date: 2016-06-15T15:50:38Z [FLINK-4027] Flush FlinkKafkaProducer on checkpoints > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2108 [FLINK-4027] Flush FlinkKafkaProducer on checkpoints A user on the mailing list raised the point that our Kafka producer can be made at-least-once quite easily. The current producer code doesn't have any guarantees We are using the producer's callbacks to account for unacknowledged records. When a checkpoint barrier reaches the sink, it will confirm the checkpoint once all pending records have been acked. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink4027 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2108.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2108 commit d657ca8be1420a3e73c48bbbf65788fbd0b75c2c Author: Robert Metzger Date: 2016-06-15T15:50:38Z [FLINK-4027] Flush FlinkKafkaProducer on checkpoints --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2102: [FLINK-4068] [tableAPI] Move constant computations out of...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2102 This @wuchong, your approach looks good. I also found that the `ReduceExpressionRules` had no effect due to the missing `RexExecutor`. However, it seems that several tests of `ExpressionITCase` are failing with this change. You can verify that the PR does not break the build by locally running `mvn clean install`. In addition, the added test should be changed as sketched in the comment. Please let me know if you have questions. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.
[ https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332448#comment-15332448 ] ASF GitHub Bot commented on FLINK-4068: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2102 This @wuchong, your approach looks good. I also found that the `ReduceExpressionRules` had no effect due to the missing `RexExecutor`. However, it seems that several tests of `ExpressionITCase` are failing with this change. You can verify that the PR does not break the build by locally running `mvn clean install`. In addition, the added test should be changed as sketched in the comment. Please let me know if you have questions. Thanks, Fabian > Move constant computations out of code-generated `flatMap` functions. > - > > Key: FLINK-4068 > URL: https://issues.apache.org/jira/browse/FLINK-4068 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske > > The generated functions for expressions of the Table API or SQL include > constant computations. > For instance the code generated for a predicate like: > {code} > myInt < (10 + 20) > {code} > looks roughly like: > {code} > public void flatMap(Row in, Collector out) { > Integer in1 = in.productElement(1); > int temp = 10 + 20; > if (in1 < temp) { > out.collect(in) > } > } > {code} > In this example the computation of {{temp}} is constant and could be moved > out of the {{flatMap()}} method. > The same might apply for generated function other than {{FlatMap}} as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2102: [FLINK-4068] [tableAPI] Move constant computations...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2102#discussion_r67233943 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala --- @@ -146,4 +148,21 @@ class SelectITCase( tEnv.sql(sqlQuery) } + @Test + def testConstantReduce(): Unit = { --- End diff -- I think the assertion of this this test should not check the name of the generate Flink operator. Instead I propose the following: - split the `translate()` method into an `optimize()` method that generates the optimized `RelNode` tree and a `translate()` method that translates into a `DataSet` / `DataStream` program. - make the `optimize()` method `private[flink]` and therefore accessible from a unit test - add a `BatchTableEnvironmentTest` and a `StreamTableEnvironmentTest` which check that the optimized `RelNode` tree contains reduced expressions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.
[ https://issues.apache.org/jira/browse/FLINK-4068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332427#comment-15332427 ] ASF GitHub Bot commented on FLINK-4068: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2102#discussion_r67233943 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala --- @@ -146,4 +148,21 @@ class SelectITCase( tEnv.sql(sqlQuery) } + @Test + def testConstantReduce(): Unit = { --- End diff -- I think the assertion of this this test should not check the name of the generate Flink operator. Instead I propose the following: - split the `translate()` method into an `optimize()` method that generates the optimized `RelNode` tree and a `translate()` method that translates into a `DataSet` / `DataStream` program. - make the `optimize()` method `private[flink]` and therefore accessible from a unit test - add a `BatchTableEnvironmentTest` and a `StreamTableEnvironmentTest` which check that the optimized `RelNode` tree contains reduced expressions. > Move constant computations out of code-generated `flatMap` functions. > - > > Key: FLINK-4068 > URL: https://issues.apache.org/jira/browse/FLINK-4068 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske > > The generated functions for expressions of the Table API or SQL include > constant computations. > For instance the code generated for a predicate like: > {code} > myInt < (10 + 20) > {code} > looks roughly like: > {code} > public void flatMap(Row in, Collector out) { > Integer in1 = in.productElement(1); > int temp = 10 + 20; > if (in1 < temp) { > out.collect(in) > } > } > {code} > In this example the computation of {{temp}} is constant and could be moved > out of the {{flatMap()}} method. > The same might apply for generated function other than {{FlatMap}} as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3916) Allow generic types passing the Table API
[ https://issues.apache.org/jira/browse/FLINK-3916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332354#comment-15332354 ] Vasia Kalavri commented on FLINK-3916: -- FLINK-3615 is about extending or refactoring {{sqlTypeToTypeInfo}}. I do not recall what kind of trouble it had caused at that time, but it looks like we were thinking of refactoring the code to determine return types per operator. > Allow generic types passing the Table API > - > > Key: FLINK-3916 > URL: https://issues.apache.org/jira/browse/FLINK-3916 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther > > The Table API currently only supports BasicTypes that can pass the Table API. > Other types should also be supported but treated as black boxes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/1856 How to remove the merge commit? If I try to remove it I lose all my commits. @fhueske - I have updated the PR. Thanks for very sharp eyes like seeing the spaces and new lines that were missed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332336#comment-15332336 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/1856 How to remove the merge commit? If I try to remove it I lose all my commits. @fhueske - I have updated the PR. Thanks for very sharp eyes like seeing the spaces and new lines that were missed. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332251#comment-15332251 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67216793 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala --- @@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } /** +* Selects an element with minimum value. +* +* The minimum is computed over the specified fields in lexicographical order. +* +* Example 1: Given a data set with elements [0, 1], [1, 0], the +* results will be: +* +* minBy(0)[0, 1] +* minBy(1)[1, 0] +* Example 2: Given a data set with elements [0, 0], [0, 1], the +* results will be: +* minBy(0, 1)[0, 0] +* If multiple values with minimum value at the specified fields exist, a random one will be +* picked. +* Internally, this operation is implemented as a {@link ReduceFunction}. --- End diff -- Ok > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332250#comment-15332250 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67216772 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala --- @@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } /** +* Selects an element with minimum value. +* +* The minimum is computed over the specified fields in lexicographical order. +* +* Example 1: Given a data set with elements [0, 1], [1, 0], the +* results will be: +* +* minBy(0)[0, 1] +* minBy(1)[1, 0] +* Example 2: Given a data set with elements [0, 0], [0, 1], the +* results will be: +* minBy(0, 1)[0, 0] +* If multiple values with minimum value at the specified fields exist, a random one will be +* picked. +* Internally, this operation is implemented as a {@link ReduceFunction}. +*/ + def minBy(fields: Int*) : DataSet[T] = { +if (!getType.isTupleType) { + throw new InvalidProgramException("DataSet#minBy(int...) only works on Tuple types.") +} + +reduce(new SelectByMinFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], fields.toArray)) + } + + /** +* Selects an element with maximum value. +* +* The maximum is computed over the specified fields in lexicographical order. +* +* Example 1: Given a data set with elements [0, 1], [1, 0], the +* results will be: +* +* maxBy(0)[1, 0] +* maxBy(1)[0, 1] +* Example 2: Given a data set with elements [0, 0], [0, 1], the +* results will be: +* maxBy(0, 1)[0, 1] +* If multiple values with maximum value at the specified fields exist, a random one will be +* picked +* Internally, this operation is implemented as a {@link ReduceFunction}. +* +*/ + def maxBy(fields: Int*) : DataSet[T] = { +if (!getType.isTupleType) { + throw new InvalidProgramException("DataSet#maxBy(int...) only works on Tuple types.") +} +reduce(new SelectByMaxFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], fields.toArray)) + } --- End diff -- This is very sharp eyes :) > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67216793 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala --- @@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } /** +* Selects an element with minimum value. +* +* The minimum is computed over the specified fields in lexicographical order. +* +* Example 1: Given a data set with elements [0, 1], [1, 0], the +* results will be: +* +* minBy(0)[0, 1] +* minBy(1)[1, 0] +* Example 2: Given a data set with elements [0, 0], [0, 1], the +* results will be: +* minBy(0, 1)[0, 0] +* If multiple values with minimum value at the specified fields exist, a random one will be +* picked. +* Internally, this operation is implemented as a {@link ReduceFunction}. --- End diff -- Ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67216772 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala --- @@ -699,6 +700,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } /** +* Selects an element with minimum value. +* +* The minimum is computed over the specified fields in lexicographical order. +* +* Example 1: Given a data set with elements [0, 1], [1, 0], the +* results will be: +* +* minBy(0)[0, 1] +* minBy(1)[1, 0] +* Example 2: Given a data set with elements [0, 0], [0, 1], the +* results will be: +* minBy(0, 1)[0, 0] +* If multiple values with minimum value at the specified fields exist, a random one will be +* picked. +* Internally, this operation is implemented as a {@link ReduceFunction}. +*/ + def minBy(fields: Int*) : DataSet[T] = { +if (!getType.isTupleType) { + throw new InvalidProgramException("DataSet#minBy(int...) only works on Tuple types.") +} + +reduce(new SelectByMinFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], fields.toArray)) + } + + /** +* Selects an element with maximum value. +* +* The maximum is computed over the specified fields in lexicographical order. +* +* Example 1: Given a data set with elements [0, 1], [1, 0], the +* results will be: +* +* maxBy(0)[1, 0] +* maxBy(1)[0, 1] +* Example 2: Given a data set with elements [0, 0], [0, 1], the +* results will be: +* maxBy(0, 1)[0, 1] +* If multiple values with maximum value at the specified fields exist, a random one will be +* picked +* Internally, this operation is implemented as a {@link ReduceFunction}. +* +*/ + def maxBy(fields: Int*) : DataSet[T] = { +if (!getType.isTupleType) { + throw new InvalidProgramException("DataSet#maxBy(int...) only works on Tuple types.") +} +reduce(new SelectByMaxFunction[T](getType.asInstanceOf[TupleTypeInfoBase[T]], fields.toArray)) + } --- End diff -- This is very sharp eyes :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1533#comment-1533 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67214413 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java --- @@ -521,7 +521,7 @@ public long count() throws Exception { } return new ReduceOperator<>(this, new SelectByMinFunction( - (TupleTypeInfo) getType(), fields), Utils.getCallLocationName()); --- End diff -- I have already reverted the style change. The only thing is that it was reverted on top of the previous commit. How to totally avoid this change from appearing from my commit history? Am not sure how to do it. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67214413 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java --- @@ -521,7 +521,7 @@ public long count() throws Exception { } return new ReduceOperator<>(this, new SelectByMinFunction( - (TupleTypeInfo) getType(), fields), Utils.getCallLocationName()); --- End diff -- I have already reverted the style change. The only thing is that it was reverted on top of the previous commit. How to totally avoid this change from appearing from my commit history? Am not sure how to do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records
[ https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4080: --- Description: I've occasionally experienced unsuccessful ManualExactlyOnceTest after several tries. Kinesis records of the same aggregated batch will have the same sequence number, and different sub-sequence numbers (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). The current code of the consumer is committing state every time it finishes processing a record, even de-aggregated ones. This is a bug since this will incorrectly mark all remaining records of the de-aggregated batch as processed in the state. Proposed fix: 1. Use the extended `UserRecord` class in KCL to represent all records (either non- or de-aggregated) instead of the basic `Record` class. This gives access to whether or not the record was originally aggregated. 2. The sequence number state we are checkpointing needs to be able to indicate that the last seen sequence number of a shard may be a de-aggregated shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record of the 5th record was last seen for shard 0. On restore, we start again from record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 we start from record 3 since record 2 is non-aggregated and already fully processed. was: I've occasionally experienced unsuccessful ManualExactlyOnceTest after several tries. Kinesis records of the same aggregated batch will have the same sequence number, and different sub-sequence numbers (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). The current code of the consumer is committing state every time it finishes processing a record, even de-aggregated ones. This is a bug since this will incorrectly mark all remaining records of the de-aggregated batch as processed in the state. Proposed fix: Use the extended `UserRecord` class in KCL to represent all records (either non- or de-aggregated) instead of the basic `Record` class. This gives access to whether or not the record was originally aggregated. If we encounter a de-aggregated record, don't update state until we finished processing the last record of the batch. > Kinesis consumer not exactly-once if stopped in the middle of processing > aggregated records > --- > > Key: FLINK-4080 > URL: https://issues.apache.org/jira/browse/FLINK-4080 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.1.0 > > > I've occasionally experienced unsuccessful ManualExactlyOnceTest after > several tries. > Kinesis records of the same aggregated batch will have the same sequence > number, and different sub-sequence numbers > (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). > The current code of the consumer is committing state every time it finishes > processing a record, even de-aggregated ones. This is a bug since this will > incorrectly mark all remaining records of the de-aggregated batch as > processed in the state. > Proposed fix: > 1. Use the extended `UserRecord` class in KCL to represent all records > (either non- or de-aggregated) instead of the basic `Record` class. This > gives access to whether or not the record was originally aggregated. > 2. The sequence number state we are checkpointing needs to be able to > indicate that the last seen sequence number of a shard may be a de-aggregated > shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record > of the 5th record was last seen for shard 0. On restore, we start again from > record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 > we start from record 3 since record 2 is non-aggregated and already fully > processed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4078) Use ClosureCleaner for CoGroup where
[ https://issues.apache.org/jira/browse/FLINK-4078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-4078: --- Assignee: Stefan Richter > Use ClosureCleaner for CoGroup where > > > Key: FLINK-4078 > URL: https://issues.apache.org/jira/browse/FLINK-4078 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Ufuk Celebi >Assignee: Stefan Richter > Fix For: 1.1.0 > > > When specifying a key selector in the where clause of a CoGroup, the closure > cleaner is not used. > {code} > .coGroup(filteredIds) > .where(new KeySelector() { > @Override > public String getKey(T t) throws Exception { > String s = (String) t.get(fieldName); > return s != null ? s : UUID.randomUUID().toString(); > } > }) > {code} > The problem is that the KeySelector is an anonymous inner class and as such > as a reference to the outer object. Normally, this would be rectified by the > closure cleaner but the cleaner is not used in CoGroup.where(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4079) YARN properties file used for per-job cluster
[ https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-4079: -- Fix Version/s: 1.1.0 > YARN properties file used for per-job cluster > - > > Key: FLINK-4079 > URL: https://issues.apache.org/jira/browse/FLINK-4079 > Project: Flink > Issue Type: Bug > Components: Command-line client >Affects Versions: 1.0.3 >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0, 1.0.4 > > > YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN > properties file, which defines the container configuration. This can lead to > unexpected behaviour, because the per-job-cluster configuration is merged > with the YARN properties file (or used as only configuration source). > A user ran into this as follows: > - Create a long-lived YARN session with HA (creates a hidden YARN properties > file) > - Submits standalone batch jobs with a per job cluster (flink run -m > yarn-cluster). The batch jobs get submitted to the long lived HA cluster, > because of the properties file. > [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch > and will be fixed with the client refactoring you are working on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4079) YARN properties file used for per-job cluster
[ https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-4079: -- Component/s: Command-line client > YARN properties file used for per-job cluster > - > > Key: FLINK-4079 > URL: https://issues.apache.org/jira/browse/FLINK-4079 > Project: Flink > Issue Type: Bug > Components: Command-line client >Affects Versions: 1.0.3 >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0, 1.0.4 > > > YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN > properties file, which defines the container configuration. This can lead to > unexpected behaviour, because the per-job-cluster configuration is merged > with the YARN properties file (or used as only configuration source). > A user ran into this as follows: > - Create a long-lived YARN session with HA (creates a hidden YARN properties > file) > - Submits standalone batch jobs with a per job cluster (flink run -m > yarn-cluster). The batch jobs get submitted to the long lived HA cluster, > because of the properties file. > [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch > and will be fixed with the client refactoring you are working on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (FLINK-4079) YARN properties file used for per-job cluster
[ https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reopened FLINK-4079: --- Assignee: Maximilian Michels I see what you mean now :) The issue is that the Yarn properties file is loaded regardless of whether "-m yarn-cluster" is specified on the command-line. This loads the dynamic properties from the Yarn properties file and applies all configuration of the running (session) cluster cluster to the to-be-created cluster. This is not expected behavior. > YARN properties file used for per-job cluster > - > > Key: FLINK-4079 > URL: https://issues.apache.org/jira/browse/FLINK-4079 > Project: Flink > Issue Type: Bug > Components: Command-line client >Affects Versions: 1.0.3 >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0, 1.0.4 > > > YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN > properties file, which defines the container configuration. This can lead to > unexpected behaviour, because the per-job-cluster configuration is merged > with the YARN properties file (or used as only configuration source). > A user ran into this as follows: > - Create a long-lived YARN session with HA (creates a hidden YARN properties > file) > - Submits standalone batch jobs with a per job cluster (flink run -m > yarn-cluster). The batch jobs get submitted to the long lived HA cluster, > because of the properties file. > [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch > and will be fixed with the client refactoring you are working on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records
[ https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4080: --- Description: I've occasionally experienced unsuccessful ManualExactlyOnceTest after several tries. Kinesis records of the same aggregated batch will have the same sequence number, and different sub-sequence numbers (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). The current code of the consumer is committing state every time it finishes processing a record, even de-aggregated ones. This is a bug since this will incorrectly mark all remaining records of the de-aggregated batch as processed in the state. Proposed fix: Use the extended `UserRecord` class in KCL to represent all records (either non- or de-aggregated) instead of the basic `Record` class. This gives access to whether or not the record was originally aggregated. If we encounter a de-aggregated record, don't update state until we finished processing the last record of the batch. was: I've occasionally experienced unsuccessful ManualExactlyOnceTest after several tries. Kinesis records of the same aggregated batch will have the same sequence number, and different sub-sequence numbers (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). The current state of the consumer is committing state every time it finishes processing a record, even de-aggregated ones. This is a bug since this will incorrectly mark all remaining records of the de-aggregated batch as processed in the state. Proposed fix: Use the extended `UserRecord` class in KCL to represent all records (either non- or de-aggregated) instead of the basic `Record` class. This gives access to whether or not the record was originally aggregated. If we encounter a de-aggregated record, don't update state until we finished processing the last record of the batch. > Kinesis consumer not exactly-once if stopped in the middle of processing > aggregated records > --- > > Key: FLINK-4080 > URL: https://issues.apache.org/jira/browse/FLINK-4080 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.1.0 > > > I've occasionally experienced unsuccessful ManualExactlyOnceTest after > several tries. > Kinesis records of the same aggregated batch will have the same sequence > number, and different sub-sequence numbers > (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). > The current code of the consumer is committing state every time it finishes > processing a record, even de-aggregated ones. This is a bug since this will > incorrectly mark all remaining records of the de-aggregated batch as > processed in the state. > Proposed fix: Use the extended `UserRecord` class in KCL to represent all > records (either non- or de-aggregated) instead of the basic `Record` class. > This gives access to whether or not the record was originally aggregated. If > we encounter a de-aggregated record, don't update state until we finished > processing the last record of the batch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records
Tzu-Li (Gordon) Tai created FLINK-4080: -- Summary: Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records Key: FLINK-4080 URL: https://issues.apache.org/jira/browse/FLINK-4080 Project: Flink Issue Type: Sub-task Components: Kinesis Connector, Streaming Connectors Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Priority: Critical Fix For: 1.1.0 I've occasionally experienced unsuccessful ManualExactlyOnceTest after several tries. Kinesis records of the same aggregated batch will have the same sequence number, and different sub-sequence numbers (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). The current state of the consumer is committing state every time it finishes processing a record, even de-aggregated ones. This is a bug since this will incorrectly mark all remaining records of the de-aggregated batch as processed in the state. Proposed fix: Use the extended `UserRecord` class in KCL to represent all records (either non- or de-aggregated) instead of the basic `Record` class. This gives access to whether or not the record was originally aggregated. If we encounter a de-aggregated record, don't update state until we finished processing the last record of the batch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4079) YARN properties file used for per-job cluster
[ https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332032#comment-15332032 ] Maximilian Michels commented on FLINK-4079: --- Exactly, the yarn properties file is a means to easily submit jobs against a long-running Flink cluster (so called yarn session). Actually, it is the only proper way at the moment besides figuring out the JobManager location manually and essentially treating the Yarn cluster as a Standalone cluster. I would also prefer to get rid of the properties file and only use the yarn application id instead. We could probably do that for one of the next releases but it would be a breaking change for the 1.1 release. > YARN properties file used for per-job cluster > - > > Key: FLINK-4079 > URL: https://issues.apache.org/jira/browse/FLINK-4079 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Ufuk Celebi > Fix For: 1.0.4 > > > YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN > properties file, which defines the container configuration. This can lead to > unexpected behaviour, because the per-job-cluster configuration is merged > with the YARN properties file (or used as only configuration source). > A user ran into this as follows: > - Create a long-lived YARN session with HA (creates a hidden YARN properties > file) > - Submits standalone batch jobs with a per job cluster (flink run -m > yarn-cluster). The batch jobs get submitted to the long lived HA cluster, > because of the properties file. > [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch > and will be fixed with the client refactoring you are working on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4077) Register Pojo DataSet/DataStream as Table requires alias expression.
[ https://issues.apache.org/jira/browse/FLINK-4077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332025#comment-15332025 ] ASF GitHub Bot commented on FLINK-4077: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/2107 [FLINK-4077] Register Pojo DataSet/DataStream as Table with field references - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tablePojoInput Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2107.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2107 commit 7e394b4097af95bc2ddfd7c33a69ba3188b5e17b Author: Fabian Hueske Date: 2016-06-15T15:12:10Z [FLINK-4077] Register Pojo DataSet/DataStream as Table with field references. > Register Pojo DataSet/DataStream as Table requires alias expression. > > > Key: FLINK-4077 > URL: https://issues.apache.org/jira/browse/FLINK-4077 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > Fix For: 1.1.0 > > > Registering a Pojo DataSet / DataStream as Table requires alias expressions > and does not work with simple field references. However, alias expressions > would only be necessary if the fields of the Pojo should be renamed. > {code} > DataStream persons = ... > // DOES NOT WORK > tEnv.registerDataStream( > "Persons", > persons, > "name, age, address"); > // DOES WORK > tEnv.registerDataStream( > "Persons", > persons, > "name AS name, age AS age, address AS address"); > {code} > We should also allow simple field name references in addition to alias > expressions to rename fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2107: [FLINK-4077] Register Pojo DataSet/DataStream as T...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/2107 [FLINK-4077] Register Pojo DataSet/DataStream as Table with field references - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tablePojoInput Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2107.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2107 commit 7e394b4097af95bc2ddfd7c33a69ba3188b5e17b Author: Fabian Hueske Date: 2016-06-15T15:12:10Z [FLINK-4077] Register Pojo DataSet/DataStream as Table with field references. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4079) YARN properties file used for per-job cluster
[ https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332002#comment-15332002 ] Ufuk Celebi commented on FLINK-4079: I understand where you are coming from, but it seems very weird to me that the behaviour I describe can happen (where the first started long running Flink session is actually still running, so the warning you mention would not be shown). I always had the mental model of "flink/bin bun -m yarn-cluster" starts a cluster for this job only, which is not the case if we re-use the properties file. I think that the properties file is mainly a concern for the long-lived YARN session, where cluster start and job submission are not tied together. > YARN properties file used for per-job cluster > - > > Key: FLINK-4079 > URL: https://issues.apache.org/jira/browse/FLINK-4079 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Ufuk Celebi > Fix For: 1.0.4 > > > YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN > properties file, which defines the container configuration. This can lead to > unexpected behaviour, because the per-job-cluster configuration is merged > with the YARN properties file (or used as only configuration source). > A user ran into this as follows: > - Create a long-lived YARN session with HA (creates a hidden YARN properties > file) > - Submits standalone batch jobs with a per job cluster (flink run -m > yarn-cluster). The batch jobs get submitted to the long lived HA cluster, > because of the properties file. > [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch > and will be fixed with the client refactoring you are working on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4079) YARN properties file used for per-job cluster
[ https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-4079. --- Resolution: Not A Problem This is a feature :) The Yarn properties file is supposed to be picked up. We can't break this behavior because users rely on it. My changes will give an appropriate error message if the properties file configuration doesn't correspond to a running application in the Yarn cluster. > YARN properties file used for per-job cluster > - > > Key: FLINK-4079 > URL: https://issues.apache.org/jira/browse/FLINK-4079 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Ufuk Celebi > Fix For: 1.0.4 > > > YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN > properties file, which defines the container configuration. This can lead to > unexpected behaviour, because the per-job-cluster configuration is merged > with the YARN properties file (or used as only configuration source). > A user ran into this as follows: > - Create a long-lived YARN session with HA (creates a hidden YARN properties > file) > - Submits standalone batch jobs with a per job cluster (flink run -m > yarn-cluster). The batch jobs get submitted to the long lived HA cluster, > because of the properties file. > [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch > and will be fixed with the client refactoring you are working on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331982#comment-15331982 ] Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:14 PM: I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Maybe there is some cleaner way of introducing a real life cycle for ExecutionGraphs? Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed into the ExecutionGraph implementations. The web interface needs to be changed to extract the information through the interface. was (Author: srichter): I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed into the ExecutionGraph implementations. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331982#comment-15331982 ] Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:13 PM: I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed into the ExecutionGraph implementations. The web interface needs to be changed to extract the information through the interface. was (Author: srichter): I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331982#comment-15331982 ] Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:12 PM: I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}), and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. was (Author: srichter): I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}, and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331982#comment-15331982 ] Stefan Richter edited comment on FLINK-4037 at 6/15/16 4:11 PM: I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {{ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}, and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. was (Author: srichter): I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}, and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331982#comment-15331982 ] Stefan Richter commented on FLINK-4037: --- I think introducing an {{ArchivedExecutionGraph}} to maintain relevant information from the {{ExecutionGraph}} of finished jobs is a little more involved. E.g. the web interface has to deal with objects from {{ArchivedExecutionGraph}} as well as {{ExecutionGraph}} to display finished and in-flight jobs. To achieve this, we could for example: * Make {{ExecutionGraph}} a subclass of {{ArchivedExecutionGraph}}. However this probably not the best idea as it violates the substitution principle. * Introduce a common interface for both, {ExecutionGraph}} and {{ArchivedExecutionGraph}}. This interface provides means for the web interface to extract information for display. The method {{prepareForArchiving()}} could convert ExecutionGraphs into ArchivedExecutionGraphs. We should ensure that asynchronous request by the web interface are routed to a valid ExecutionGraph until the conversion is complete and the object that acts as model for the web interface is substituted. Furthermore, we need to identify all references in ExecutionGraph that could hold objects from user-provided classes (e.g. accumulators, metrics, ...), stringify their information (see {{JobConfigHandler.handleRequest()}}, and release the references so that the user classloader can be garbage-collected. Corresponding parts of the mentioned stringification have to be pushed the ExecutionGraphs. The web interface needs to be changed to extract the information through the interface. > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger >Assignee: Stefan Richter > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4079) YARN properties file used for per-job cluster
Ufuk Celebi created FLINK-4079: -- Summary: YARN properties file used for per-job cluster Key: FLINK-4079 URL: https://issues.apache.org/jira/browse/FLINK-4079 Project: Flink Issue Type: Bug Affects Versions: 1.0.3 Reporter: Ufuk Celebi Fix For: 1.0.4 YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN properties file, which defines the container configuration. This can lead to unexpected behaviour, because the per-job-cluster configuration is merged with the YARN properties file (or used as only configuration source). A user ran into this as follows: - Create a long-lived YARN session with HA (creates a hidden YARN properties file) - Submits standalone batch jobs with a per job cluster (flink run -m yarn-cluster). The batch jobs get submitted to the long lived HA cluster, because of the properties file. [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch and will be fixed with the client refactoring you are working on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3105) Submission in per job YARN cluster mode reuses properties file of long-lived session
[ https://issues.apache.org/jira/browse/FLINK-3105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-3105. --- Resolution: Not A Problem This is actually the expected behavior. > Submission in per job YARN cluster mode reuses properties file of long-lived > session > > > Key: FLINK-3105 > URL: https://issues.apache.org/jira/browse/FLINK-3105 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 0.10.1 >Reporter: Ufuk Celebi > > Starting a YARN session with `bin/yarn-session.sh` creates a properties file, > which is used to parse job manager information when submitting jobs. > This properties file is also used when submitting a job with {{bin/flink run > -m yarn-cluster}}. The {{yarn-cluster}} mode should actually start a new YARN > session. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4078) Use ClosureCleaner for CoGroup where
Ufuk Celebi created FLINK-4078: -- Summary: Use ClosureCleaner for CoGroup where Key: FLINK-4078 URL: https://issues.apache.org/jira/browse/FLINK-4078 Project: Flink Issue Type: Bug Affects Versions: 1.0.3 Reporter: Ufuk Celebi Fix For: 1.1.0 When specifying a key selector in the where clause of a CoGroup, the closure cleaner is not used. {code} .coGroup(filteredIds) .where(new KeySelector() { @Override public String getKey(T t) throws Exception { String s = (String) t.get(fieldName); return s != null ? s : UUID.randomUUID().toString(); } }) {code} The problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331894#comment-15331894 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2085 Rebased to the latest master and refined the tests. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2085 Rebased to the latest master and refined the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4077) Register Pojo DataSet/DataStream as Table requires alias expression.
Fabian Hueske created FLINK-4077: Summary: Register Pojo DataSet/DataStream as Table requires alias expression. Key: FLINK-4077 URL: https://issues.apache.org/jira/browse/FLINK-4077 Project: Flink Issue Type: Bug Components: Table API Affects Versions: 1.1.0 Reporter: Fabian Hueske Assignee: Fabian Hueske Fix For: 1.1.0 Registering a Pojo DataSet / DataStream as Table requires alias expressions and does not work with simple field references. However, alias expressions would only be necessary if the fields of the Pojo should be renamed. {code} DataStream persons = ... // DOES NOT WORK tEnv.registerDataStream( "Persons", persons, "name, age, address"); // DOES WORK tEnv.registerDataStream( "Persons", persons, "name AS name, age AS age, address AS address"); {code} We should also allow simple field name references in addition to alias expressions to rename fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers
[ https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331882#comment-15331882 ] ASF GitHub Bot commented on FLINK-4063: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2106 +1 > Add Metrics Support for Triggers > > > Key: FLINK-4063 > URL: https://issues.apache.org/jira/browse/FLINK-4063 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Now that we have proper support for metrics we should also add a hook that > allows triggers to report metrics. > This supersedes FLINK-3758 which was about using accumulators for metrics in > triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2106: [FLINK-4063] Add Metrics Support for Triggers
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2106 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers
[ https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331880#comment-15331880 ] ASF GitHub Bot commented on FLINK-4063: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2106 @zentol updated, this is super minimal now. > Add Metrics Support for Triggers > > > Key: FLINK-4063 > URL: https://issues.apache.org/jira/browse/FLINK-4063 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Now that we have proper support for metrics we should also add a hook that > allows triggers to report metrics. > This supersedes FLINK-3758 which was about using accumulators for metrics in > triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2106: [FLINK-4063] Add Metrics Support for Triggers
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2106 @zentol updated, this is super minimal now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4076) BoltWrapper#dispose() should call AbstractStreamOperator#dispose()
[ https://issues.apache.org/jira/browse/FLINK-4076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331841#comment-15331841 ] Aljoscha Krettek commented on FLINK-4076: - +1 this is correct > BoltWrapper#dispose() should call AbstractStreamOperator#dispose() > -- > > Key: FLINK-4076 > URL: https://issues.apache.org/jira/browse/FLINK-4076 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > @Override > public void dispose() { > this.bolt.cleanup(); > } > {code} > AbstractStreamOperator#dispose() should be called first. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers
[ https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331804#comment-15331804 ] ASF GitHub Bot commented on FLINK-4063: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2106 Ok, good to know. I'll change it. 😃 > Add Metrics Support for Triggers > > > Key: FLINK-4063 > URL: https://issues.apache.org/jira/browse/FLINK-4063 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Now that we have proper support for metrics we should also add a hook that > allows triggers to report metrics. > This supersedes FLINK-3758 which was about using accumulators for metrics in > triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2106: [FLINK-4063] Add Metrics Support for Triggers
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2106 Ok, good to know. I'll change it. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers
[ https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331788#comment-15331788 ] ASF GitHub Bot commented on FLINK-4063: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2106 well it would work, but i think we can improve it a little. :) The AbstractStreamOperator contains a protected MetricGroup field which you can use in the WindowOperator instead of going through the RuntimeContext. You can access it directly or via getMetricGroup(). Also, you create 2 groups with a constant name. This means that all metrics that are registered in a trigger now contain "WindowOperator.Trigger" in their final metric name. Now the Trigger group is probably _fine_, but is now also mandatory. I would prefer removing it; if users want to group their metrics by Trigger they can easily do so themselves. The WindowOperator group should be removed. It is inconsistent with other operators and provides no really specific information. It will usually be redundant since the operator name often contains the String "window" somewhere. > Add Metrics Support for Triggers > > > Key: FLINK-4063 > URL: https://issues.apache.org/jira/browse/FLINK-4063 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Now that we have proper support for metrics we should also add a hook that > allows triggers to report metrics. > This supersedes FLINK-3758 which was about using accumulators for metrics in > triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2106: [FLINK-4063] Add Metrics Support for Triggers
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2106 well it would work, but i think we can improve it a little. :) The AbstractStreamOperator contains a protected MetricGroup field which you can use in the WindowOperator instead of going through the RuntimeContext. You can access it directly or via getMetricGroup(). Also, you create 2 groups with a constant name. This means that all metrics that are registered in a trigger now contain "WindowOperator.Trigger" in their final metric name. Now the Trigger group is probably _fine_, but is now also mandatory. I would prefer removing it; if users want to group their metrics by Trigger they can easily do so themselves. The WindowOperator group should be removed. It is inconsistent with other operators and provides no really specific information. It will usually be redundant since the operator name often contains the String "window" somewhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331785#comment-15331785 ] ASF GitHub Bot commented on FLINK-3859: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2088 Thanks for the update @twalthr. The changes look good. Should be good to merge after the aggregators commit is added. > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger support to...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2088 Thanks for the update @twalthr. The changes look good. Should be good to merge after the aggregators commit is added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4076) BoltWrapper#dispose() should call AbstractStreamOperator#dispose()
Ted Yu created FLINK-4076: - Summary: BoltWrapper#dispose() should call AbstractStreamOperator#dispose() Key: FLINK-4076 URL: https://issues.apache.org/jira/browse/FLINK-4076 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor {code} @Override public void dispose() { this.bolt.cleanup(); } {code} AbstractStreamOperator#dispose() should be called first. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15331751#comment-15331751 ] ASF GitHub Bot commented on FLINK-3859: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r67161013 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala --- @@ -143,15 +143,30 @@ object ScalarFunctions { new MultiTypeMethodCallGen(BuiltInMethods.ABS)) addSqlFunction( +ABS, +Seq(BIG_DEC_TYPE_INFO), +new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC)) + + addSqlFunction( --- End diff -- I see, thanks > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r67161013 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala --- @@ -143,15 +143,30 @@ object ScalarFunctions { new MultiTypeMethodCallGen(BuiltInMethods.ABS)) addSqlFunction( +ABS, +Seq(BIG_DEC_TYPE_INFO), +new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC)) + + addSqlFunction( --- End diff -- I see, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---