[jira] [Commented] (FLINK-5590) Create a proper internal state hierarchy
[ https://issues.apache.org/jira/browse/FLINK-5590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833260#comment-15833260 ] Xiaogang Shi commented on FLINK-5590: - [~StephanEwen], do you have any ideas of the solution? I think {{KvState}} already provides some needed internal methods. Maybe we can extend it to create the internal state hierarchy? > Create a proper internal state hierarchy > > > Key: FLINK-5590 > URL: https://issues.apache.org/jira/browse/FLINK-5590 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Currently, the state interfaces (like {{ListState}}, {{ValueState}}, > {{ReducingState}}) are very sparse and contain only methods exposed to the > users. That makes sense to keep the public stable API minimal > At the same time, the runtime needs more methods for its internal interaction > with state, such as: > - setting namespaces > - accessing raw values > - merging namespaces > These are currently realized by re-creating or re-obtaining the state objects > from the KeyedStateBackend. That method causes quite an overhead for each > access to the state > The KeyedStateBackend tries to do some tricks to reduce that overhead, but > does it only partially and induces other overhead in the course. > The root cause of all these issues is a problem in the design: There is no > proper "internal state abstraction" in a similar way as there is an external > state abstraction (the public state API). > We should add a similar hierarchy of states for the internal methods. It > would look like in the example below: > {code} > * State > * | > * +---InternalKvState > * | | > * MergingState | > * | | > * +-InternalMergingState > * | | > * ++--+ | > * | | | > * ReducingStateListState+-+-+ > * | || | > * +---+ +--- -InternalListState > * || > * +-InternalReducingState > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5544) Implement Internal Timer Service in RocksDB
[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi reassigned FLINK-5544: --- Assignee: Xiaogang Shi > Implement Internal Timer Service in RocksDB > --- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4683) Add SlideRow row-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833214#comment-15833214 ] sunjincheng edited comment on FLINK-4683 at 1/22/17 1:12 AM: - Hi,[~fhueske] I picking up this issue of the reason, sliding-row window whether it is Stream or Batch logic (window division, trigger mechanism) should be consistent, by the same person to implement more convenient, so I want to pick up FLINK-4680 and FLINK-4683.:) I agree with you about row-window (FLINK-4678, FLINK-4679, FLINK -4680) We need a whole design, more importantly, we need to consider the design of the SQL implementation, and need to maintain a high degree of consistency with calcite .IMO, in fact OVER is the standard database window, perhaps in addition to the current flink There are three window types ("Tumbling Window", "Session Window", "Sliding Window"). We need a new window type. I want to call it "Relative Window".(the name We can discuss again). What do you think ? was (Author: sunjincheng121): Hi,[~fhueske] I picking up this issue of the reason, sliding-row window whether it is Stream or Batch logic (window division, trigger mechanism) should be consistent, by the same person to implement more convenient, so I want to pick up FLINK-4680 and FLINK-4683.:) I agree with you about row-window (FLINK-4678, FLINK-4679, FLINK -4680) We need a whole design, more importantly, we need to consider the design of the SQL implementation, and need to maintain a high degree of consistency with calcite .IMO, in fact OVER is the standard database window, perhaps in addition to the current flink There are three window types ("Tumbling Window", "Session Window", "Sliding Window"). We need a new window type. I want to call it "Relative Window".(the name We can discuss again). > Add SlideRow row-windows for batch tables > - > > Key: FLINK-4683 > URL: https://issues.apache.org/jira/browse/FLINK-4683 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Add SlideRow row-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4683) Add SlideRow row-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833214#comment-15833214 ] sunjincheng commented on FLINK-4683: Hi,[~fhueske] I picking up this issue of the reason, sliding-row window whether it is Stream or Batch logic (window division, trigger mechanism) should be consistent, by the same person to implement more convenient, so I want to pick up FLINK-4680 and FLINK-4683.:) I agree with you about row-window (FLINK-4678, FLINK-4679, FLINK -4680) We need a whole design, more importantly, we need to consider the design of the SQL implementation, and need to maintain a high degree of consistency with calcite .IMO, in fact OVER is the standard database window, perhaps in addition to the current flink There are three window types ("Tumbling Window", "Session Window", "Sliding Window"). We need a new window type. I want to call it "Relative Window".(the name We can discuss again). > Add SlideRow row-windows for batch tables > - > > Key: FLINK-4683 > URL: https://issues.apache.org/jira/browse/FLINK-4683 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Add SlideRow row-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5525) Streaming Version of a Linear Regression model
[ https://issues.apache.org/jira/browse/FLINK-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833070#comment-15833070 ] Stavros Kontopoulos edited comment on FLINK-5525 at 1/21/17 5:38 PM: - [~mtunqiue] Sure I agree there other algorithms eg. clustering which may have a streaming version feel free to open others and work on them. If you want to co-ordinate on this let me know. For example we need to set the abstractions. Check Spark implementation for an example of what the abstractions might be. I didn't open other issues because I wanted to see what people think first. was (Author: skonto): [~mtunqiue] Sure I agree there other algorithms eg. clustering which may have a streaming version feel free to open others and work on them. If you want to co-ordinate on this let me know. For example we need to set the abstractions first like, check Spark implementation for an example. I didn't do that because I wanted to see what people think first. > Streaming Version of a Linear Regression model > -- > > Key: FLINK-5525 > URL: https://issues.apache.org/jira/browse/FLINK-5525 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos > > Given the nature of Flink we should have a streaming version of the > algorithms when possible. > Update of the model should be done on a per window basis. > An extreme case is: https://en.wikipedia.org/wiki/Online_machine_learning > Resources > [1] > http://scikit-learn.org/dev/modules/scaling_strategies.html#incremental-learning > [2] > http://stats.stackexchange.com/questions/6920/efficient-online-linear-regression > [3] https://spark.apache.org/docs/1.1.0/mllib-linear-methods.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5525) Streaming Version of a Linear Regression model
[ https://issues.apache.org/jira/browse/FLINK-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833070#comment-15833070 ] Stavros Kontopoulos edited comment on FLINK-5525 at 1/21/17 5:38 PM: - [~mtunqiue] Sure I agree there are other algorithms eg. clustering which may have a streaming version feel free to open others and work on them. If you want to co-ordinate on this let me know. For example we need to set the abstractions. Check Spark implementation for an example of what the abstractions might be. I didn't open other issues because I wanted to see what people think first. was (Author: skonto): [~mtunqiue] Sure I agree there other algorithms eg. clustering which may have a streaming version feel free to open others and work on them. If you want to co-ordinate on this let me know. For example we need to set the abstractions. Check Spark implementation for an example of what the abstractions might be. I didn't open other issues because I wanted to see what people think first. > Streaming Version of a Linear Regression model > -- > > Key: FLINK-5525 > URL: https://issues.apache.org/jira/browse/FLINK-5525 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos > > Given the nature of Flink we should have a streaming version of the > algorithms when possible. > Update of the model should be done on a per window basis. > An extreme case is: https://en.wikipedia.org/wiki/Online_machine_learning > Resources > [1] > http://scikit-learn.org/dev/modules/scaling_strategies.html#incremental-learning > [2] > http://stats.stackexchange.com/questions/6920/efficient-online-linear-regression > [3] https://spark.apache.org/docs/1.1.0/mllib-linear-methods.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5525) Streaming Version of a Linear Regression model
[ https://issues.apache.org/jira/browse/FLINK-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833070#comment-15833070 ] Stavros Kontopoulos edited comment on FLINK-5525 at 1/21/17 5:37 PM: - [~mtunqiue] Sure I agree there other algorithms eg. clustering which may have a streaming version feel free to open others and work on them. If you want to co-ordinate on this let me know. For example we need to set the abstractions first like, check Spark implementation for an example. I didn't do that because I wanted to see what people think first. was (Author: skonto): [~mtunqiue] Sure I agree there other algorithms eg. clustering which may have a streaming version feel free to open others. I didn't do that because I wanted to see what people think first. > Streaming Version of a Linear Regression model > -- > > Key: FLINK-5525 > URL: https://issues.apache.org/jira/browse/FLINK-5525 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos > > Given the nature of Flink we should have a streaming version of the > algorithms when possible. > Update of the model should be done on a per window basis. > An extreme case is: https://en.wikipedia.org/wiki/Online_machine_learning > Resources > [1] > http://scikit-learn.org/dev/modules/scaling_strategies.html#incremental-learning > [2] > http://stats.stackexchange.com/questions/6920/efficient-online-linear-regression > [3] https://spark.apache.org/docs/1.1.0/mllib-linear-methods.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5525) Streaming Version of a Linear Regression model
[ https://issues.apache.org/jira/browse/FLINK-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833070#comment-15833070 ] Stavros Kontopoulos commented on FLINK-5525: [~mtunqiue] Sure I agree there other algorithms eg. clustering which may have a streaming version feel free to open others. I didn't do that because I wanted to see what people think first. > Streaming Version of a Linear Regression model > -- > > Key: FLINK-5525 > URL: https://issues.apache.org/jira/browse/FLINK-5525 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Stavros Kontopoulos > > Given the nature of Flink we should have a streaming version of the > algorithms when possible. > Update of the model should be done on a per window basis. > An extreme case is: https://en.wikipedia.org/wiki/Online_machine_learning > Resources > [1] > http://scikit-learn.org/dev/modules/scaling_strategies.html#incremental-learning > [2] > http://stats.stackexchange.com/questions/6920/efficient-online-linear-regression > [3] https://spark.apache.org/docs/1.1.0/mllib-linear-methods.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5602) Migration with RocksDB job led to NPE for next checkpoint
Ufuk Celebi created FLINK-5602: -- Summary: Migration with RocksDB job led to NPE for next checkpoint Key: FLINK-5602 URL: https://issues.apache.org/jira/browse/FLINK-5602 Project: Flink Issue Type: Bug Reporter: Ufuk Celebi When migrating a job with RocksDB I got the following Exception when the next checkpoint was triggered. This only happened once and I could not reproduce it ever since. [~stefanrichte...@gmail.com] Maybe we can look over the code and check what could have failed here? I unfortunately don't have more available of the stack trace. I don't think that this will be very helpful will it? {code} at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy$StateMetaInfo.(KeyedBackendSerializationProxy.java:126) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:471) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:382) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:280) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.performOperation(RocksDBKeyedStateBackend.java:262) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:37) ... 6 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5601) Window operator does not checkpoint watermarks
Ufuk Celebi created FLINK-5601: -- Summary: Window operator does not checkpoint watermarks Key: FLINK-5601 URL: https://issues.apache.org/jira/browse/FLINK-5601 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Ufuk Celebi During release testing [~stefanrichte...@gmail.com] and I noticed that watermarks are not checkpointed in the window operator. This can lead to non determinism when restoring checkpoints. I was running an adjusted {{SessionWindowITCase}} via Kafka for testing migration and rescaling and ran into failures, because the data generator required determinisitic behaviour. What happened was that on restore it could happen that late elements were not dropped, because the watermarks needed to be re-established after restore first. [~aljoscha] Do you know whether there is a special reason for explicitly not checkpointing watermarks? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5600) Improve error message when triggering savepoint without specified directory
Ufuk Celebi created FLINK-5600: -- Summary: Improve error message when triggering savepoint without specified directory Key: FLINK-5600 URL: https://issues.apache.org/jira/browse/FLINK-5600 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Ufuk Celebi Priority: Minor When triggering a savepoint w/o specifying a custom target directory or having configured a default directory, we get a quite long stack trace: {code} java.lang.Exception: Failed to trigger savepoint at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:801) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: No savepoint directory configured. You can either specify a directory when triggering this savepoint or configure a cluster-wide default via key 'state.savepoints.dir'. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:764) ... 22 more {code} This is already quite good, because the Exception says what can be done to work around this problem, but we can make it even better by handling this error in the client and printing a more explicit message. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5599) State interface docs often refer to keyed state only
Ufuk Celebi created FLINK-5599: -- Summary: State interface docs often refer to keyed state only Key: FLINK-5599 URL: https://issues.apache.org/jira/browse/FLINK-5599 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Ufuk Celebi Priority: Minor The JavaDocs of the {{State}} interface (and related classes) often mention keyed state only as the state interface was only exposed for keyed state until Flink 1.1. With the new {{CheckpointedFunction}} interface, this has changed and the docs should be adjusted accordingly. Would be nice to address this with 1.2.0 so that the JavaDocs are updated for users. [~stefanrichte...@gmail.com] or [~aljoscha] maybe you can have a look at this briefly? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3149 Hi @fhueske , Regarding to the field type serialization, I think maybe we can provide default deserialization for basic types (int,long,String...) if users do use the `Bytes.toBytes(...)` to serialize the basic types. If not, users can ask this field to return raw bytes in this way: `htableSchame.add("column_family", "qualifier", byte[].class)` and then use an user defined scalar function to deserialize the value. regarding to the rowkeys, I agree with you. It would be great if we can set scan range by WHERE clause. But FLINK-3849 (FilterableTableSource) is still a pending PR, I would suggest to break this issue into two. 1. add HBaseTableSource, provide access to HBase tables and support nested schema. 2. extend HBaseTableSource to support FilterableTableSource. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5584) Support Sliding-count row-window on streaming sql
[ https://issues.apache.org/jira/browse/FLINK-5584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833018#comment-15833018 ] ASF GitHub Bot commented on FLINK-5584: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3175 Hi @hongyuhong , don't worry about that. Very welcome to contribute to Flink. And please feel free to contact us if you have any question! > Support Sliding-count row-window on streaming sql > - > > Key: FLINK-5584 > URL: https://issues.apache.org/jira/browse/FLINK-5584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Yuhong Hong > > Calcite has already support sliding-count row-window, the grammar look like: > select sum(amount) over (rows 10 preceding) from Order; > select sum(amount) over (partition by user rows 10 preceding) from Order; > And it will parse the sql as a LogicalWindow relnode, the logical Window > contains aggregate func info and window info, it's similar to Flink > LogicalWIndowAggregate, so we can add an convert rule to directly convert > LogicalWindow into DataStreamAggregate relnode, and if Calcite support more > grammar, we can extend the convert rule. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3175: [FLINK-5584]support sliding-count row-window on streaming...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3175 Hi @hongyuhong , don't worry about that. Very welcome to contribute to Flink. And please feel free to contact us if you have any question! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5592) Wrong number of RowSerializers with nested Rows in Collection mode
[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833003#comment-15833003 ] Jark Wu commented on FLINK-5592: Hi [~tonycox], I think it maybe a problem in your code. The first level of your table schema (defined by {{getReturnType}}) has only one column named {{person}}. But the first level of your table data has two columns. That's why the exception occurred. You can modify your table data to following and try to run again. {code} val data = List( Row.of(Row.of("data_1", "dob")), Row.of(Row.of("data_1", "dob")), Row.of(Row.of("data_1", "dob"))) {code} > Wrong number of RowSerializers with nested Rows in Collection mode > -- > > Key: FLINK-5592 > URL: https://issues.apache.org/jira/browse/FLINK-5592 > Project: Flink > Issue Type: Bug >Reporter: Anton Solovev > > {code} > @Test > def testNestedRowTypes(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > tEnv.registerTableSource("rows", new MockSource) > val table: Table = tEnv.scan("rows") > val nestedTable: Table = tEnv.scan("rows").select('person) > val collect: Seq[Row] = nestedTable.collect() > print(collect) > } > class MockSource extends BatchTableSource[Row] { > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.DataSet > override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { > val data = List( > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) > execEnv.fromCollection(data.asJava, getReturnType) > } > override def getReturnType: TypeInformation[Row] = { > new RowTypeInfo( > Array[TypeInformation[_]]( > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO), > Array("name", "age"))), > Array("person") > ) > } > } > {code} > throws {{java.lang.RuntimeException: Row arity of from does not match > serializers}} > stacktrace > {code} > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)