[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14963512#comment-14963512 ] ASF GitHub Bot commented on FLINK-2809: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1217 > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14961835#comment-14961835 ] ASF GitHub Bot commented on FLINK-2809: --- Github user aalexandrov commented on the pull request: https://github.com/apache/flink/pull/1217#issuecomment-148901986 > Not all methods without paremeters should translate to methods without parenthesis... @StephanEwen I agree with that, but I cannot understand how the `UnitTypeInfo` might cause a confusion here. The typeInformation macros are synthesized by the macro based on the inferred collection type, which means that the meaning of `()` is resolved before that. Consider the following example: ```scala // in the Scala REPL case class Foo(answer: Int) // defined class Foo def f1(): Foo = Foo(42) // f1: ()Foo def f2: Foo = Foo(42) // f2: Foo val xs = Seq(f1(), f2) // how a literate person would write it // xs: Seq[Foo] = List(Foo(42), Foo(42)) val xs = Seq(f1, f2) // how a dazed & confused person would write it, but still compiles // xs: Seq[Foo] = List(Foo(42), Foo(42)) val xs = Seq(f1, f2()) // even worse, but this breaks with a compiler exception // error: Foo does not take parameters // val xs = Seq(f1, f2()) val xs = Seq((), ()) // typing '()' without syntactic context resolves to Unit // xs: Seq[Unit] = List((), ()) ``` In all of the above situations `env.fromCollection(xs)` is (1) either going to typecheck and trigger `TypeInformation` synthesis or (2) fail with the above. Can you point to StackOverflow conversation or something similar where the issue you mention is explained with an example? > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14961010#comment-14961010 ] ASF GitHub Bot commented on FLINK-2809: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1217#issuecomment-148773959 Apparently you can sometimes get weird behavior if you confuse the two cases, because extra parenthesis are then accidentally interpreted as a call to "apply()". > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960693#comment-14960693 ] ASF GitHub Bot commented on FLINK-2809: --- Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1217#issuecomment-148716867 Can you please merge this before the 0.10 code freeze? > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960871#comment-14960871 ] ASF GitHub Bot commented on FLINK-2809: --- Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1217#issuecomment-148743631 Thanks, for reviewing it! I actually can't think of any dangers of overriding a Java method with a no-paren method, but I don't mind, I have changed them to empty-paren. > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960837#comment-14960837 ] ASF GitHub Bot commented on FLINK-2809: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r42252205 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer + +class UnitTypeInfo extends TypeInformation[Unit] { + override def isBasicType: Boolean = false --- End diff -- Is it dangerous to override methods with parenthesis as methods without parenthesis? This is always tricky when mixing Java/Scala and has caused many issues up to now. The safest thing was always to not omit the parenthesis when calling / referencing / overriding Java methods. > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960842#comment-14960842 ] ASF GitHub Bot commented on FLINK-2809: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r42252385 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class UnitSerializer extends TypeSerializerSingleton[Unit] { + + def isImmutableType: Boolean = true + + def createInstance: Unit = () --- End diff -- Does this qualify as a pure property accessor? > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960841#comment-14960841 ] ASF GitHub Bot commented on FLINK-2809: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r42252297 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class UnitSerializer extends TypeSerializerSingleton[Unit] { + + def isImmutableType: Boolean = true --- End diff -- Empty-parenthesis overridden as no-parenthesis. > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960845#comment-14960845 ] ASF GitHub Bot commented on FLINK-2809: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1217#issuecomment-148741076 Looks mostly good, but there are a few cases where I am unsure about the Scala/Java interop. Not all methods without paremeters should translate to methods without parenthesis... > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14947013#comment-14947013 ] ASF GitHub Bot commented on FLINK-2809: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r41403652 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class UnitSerializer extends TypeSerializerSingleton[Unit] { + + def isImmutableType: Boolean = true + + def createInstance: Unit = () + + def copy(from: Unit): Unit = () + + def copy(from: Unit, reuse: Unit): Unit = () + + def getLength: Int = 1 + + def serialize(record: Unit, target: DataOutputView) { +target.write(0) + } + + def deserialize(source: DataInputView): Unit = { +source.readByte() +() + } + + def deserialize(reuse: Unit, source: DataInputView): Unit = { +source.readByte() +() + } + + def copy(source: DataInputView, target: DataOutputView) { +target.write(source.readByte) + } + + override def hashCode(): Int = classOf[UnitSerializer].hashCode + + override def canEqual(obj: scala.Any): Boolean = { +obj.isInstanceOf[Unit] --- End diff -- Is `Unit` equal to `UnitSerializer`? > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at >
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14947027#comment-14947027 ] ASF GitHub Bot commented on FLINK-2809: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r41404589 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class UnitSerializer extends TypeSerializerSingleton[Unit] { + + def isImmutableType: Boolean = true + + def createInstance: Unit = () + + def copy(from: Unit): Unit = () + + def copy(from: Unit, reuse: Unit): Unit = () + + def getLength: Int = 1 + + def serialize(record: Unit, target: DataOutputView) { +target.write(0) + } + + def deserialize(source: DataInputView): Unit = { +source.readByte() +() + } + + def deserialize(reuse: Unit, source: DataInputView): Unit = { +source.readByte() +() + } + + def copy(source: DataInputView, target: DataOutputView) { +target.write(source.readByte) + } + + override def hashCode(): Int = classOf[UnitSerializer].hashCode + + override def canEqual(obj: scala.Any): Boolean = { +obj.isInstanceOf[Unit] --- End diff -- Sorry, I have now changed it to ```UnitSerializer```. > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at >
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943140#comment-14943140 ] ASF GitHub Bot commented on FLINK-2809: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r41126581 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class UnitSerializer extends TypeSerializerSingleton[Unit] { + + def isImmutableType: Boolean = true + + def createInstance: Unit = () + + def copy(from: Unit): Unit = () + + def copy(from: Unit, reuse: Unit): Unit = () + + def getLength: Int = 1 + + def serialize(record: Unit, target: DataOutputView) { +target.write(0) + } + + def deserialize(source: DataInputView): Unit = { +source.readByte --- End diff -- The `readByte()` and `write()` method should have parenthesis (both for semantics and to avoid confusion that the Unit parenthesis belong to the method call... I think we need an entry in the coding guidelines for Scala there (or better, a style check), that because it seems that dropping the parenthesis when Scala calls Java functions happens a bit too eagerly (I think most IDEs make not very good suggestions there as well). > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at >
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943150#comment-14943150 ] ASF GitHub Bot commented on FLINK-2809: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r41126892 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java --- @@ -130,8 +130,14 @@ public String toString() { if (elem == null) { throw new IllegalArgumentException("The collection must not contain null elements."); } - - if (!viewedAs.isAssignableFrom(elem.getClass())) { + + // The second part of the condition is a workaround for the situation that can arise from eg. + // "env.fromElements((),(),())" + // In this situation, UnitTypeInfo.getTypeClass returns void.class (when we are in the Java world), but + // the actual objects that we will be working with, will be BoxedUnits. + if (!viewedAs.isAssignableFrom(elem.getClass()) && --- End diff -- This looks like a fragile (and unelegant) test that can easily break as soon as Scala changes something about boxed units. Is there a different way to do this, for example let the `UnitTypeInfo` return directly `BoxedUnit` as the type class? If that is not possible, then this needs a Unit test in `flink-tests` to guard this check... > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943264#comment-14943264 ] ASF GitHub Bot commented on FLINK-2809: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r41134947 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class UnitSerializer extends TypeSerializerSingleton[Unit] { + + def isImmutableType: Boolean = true + + def createInstance: Unit = () + + def copy(from: Unit): Unit = () + + def copy(from: Unit, reuse: Unit): Unit = () + + def getLength: Int = 1 + + def serialize(record: Unit, target: DataOutputView) { +target.write(0) + } + + def deserialize(source: DataInputView): Unit = { +source.readByte --- End diff -- OK, sorry, I have added the parens. > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at >
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943277#comment-14943277 ] ASF GitHub Bot commented on FLINK-2809: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1217#discussion_r41135832 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java --- @@ -130,8 +130,14 @@ public String toString() { if (elem == null) { throw new IllegalArgumentException("The collection must not contain null elements."); } - - if (!viewedAs.isAssignableFrom(elem.getClass())) { + + // The second part of the condition is a workaround for the situation that can arise from eg. + // "env.fromElements((),(),())" + // In this situation, UnitTypeInfo.getTypeClass returns void.class (when we are in the Java world), but + // the actual objects that we will be working with, will be BoxedUnits. + if (!viewedAs.isAssignableFrom(elem.getClass()) && --- End diff -- I agree that this is not elegant, but I don't see any better way. UnitTypeInfo.getTypeClass can't return classOf[BoxedUnit], because TypeInformation.getTypeClass() returns Class, and here T is Unit. I already had a test for this in TypeInformationGenTest.testUnit. Now I also added a comment, that explains that those two lines in the test are testing this condition. By the way, if you really dislike this condition, then I can just remove it. The rest of the PR already solves my original problem, so I can live without "fromElements((),(),())" working. > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at >
[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work
[ https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14941359#comment-14941359 ] ASF GitHub Bot commented on FLINK-2809: --- GitHub user ggevay opened a pull request: https://github.com/apache/flink/pull/1217 [FLINK-2809] [scala-api] Added UnitTypeInfo and UnitSerializer. Created UnitTypeInfo and UnitSerializer, which will be created for a DataSet[Unit]. Also added a test. There is a funny situation in CollectionInputFormat.checkCollection: when ExecutionEnvironment.fromCollection calls it, the call to type.getTypeClass() returns void.class, even though it should be classOf[Unit]. This is probably some automatic conversion that happens when classOf[Unit] passes from the Scala world to the Java world. I worked around this by adding a check for this specific case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ggevay/flink unitTypeInfo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1217.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1217 commit 0fe37101586cd01148fe4bb629f8ad743a32778d Author: Gabor GevayDate: 2015-10-02T16:04:59Z [FLINK-2809] [scala-api] Added UnitTypeInfo and UnitSerializer. > DataSet[Unit] doesn't work > -- > > Key: FLINK-2809 > URL: https://issues.apache.org/jira/browse/FLINK-2809 > Project: Flink > Issue Type: Bug > Components: Scala API >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > > The following code creates a DataSet\[Unit\]: > val env = ExecutionEnvironment.createLocalEnvironment() > val a = env.fromElements(1,2,3) > val b = a.map (_ => ()) > b.writeAsText("/tmp/xxx") > env.execute() > This doesn't work, because a VoidSerializer is created, which can't cope with > a BoxedUnit. See exception below. > I'm now thinking about creating a UnitSerializer class. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to java.lang.Void > at > org.apache.flink.api.common.typeutils.base.VoidSerializer.serialize(VoidSerializer.java:26) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)