Hi Nick, >From StackOverflow, I see a similar issue which answered by @Till Rohrmann <trohrm...@apache.org> . [1] FYI.
Best, Vino [1]: https://stackoverflow.com/questions/38214958/flink-error-specifying-keys-via-field-positions-is-only-valid-for-tuple-data-ty Nicholas Walton <nwal...@me.com> 于2019年12月14日周六 上午12:01写道: > I was refactoring some Flink code to use IndexedSeq rather than Array. When I > compiled the code I had failures that required according to the URL below the > following to be inserted > > /* > * Type information (see > https://stackoverflow.com/questions/37920023/could-not-find-implicit-value-for-evidence-parameter-of-type-org-apache-flink-ap) > * > * Code when ported to use IndexedSeq rather than Array > * and similar refuses to build without this information > */ > implicit val typeInfo1 = TypeInformation.of(classOf[(Int, Long, Double, Int)]) > implicit val typeInfo2 = TypeInformation.of(classOf[(Int, Long, Double, > Double)]) > implicit val typeInfo3 = TypeInformation.of(classOf[(Int, Long, Double, > IndexedSeq[Long])]) > implicit val typeInfo4 = TypeInformation.of(classOf[(Int, Long, Double, > IndexedSeq[BigInt])]) > implicit val typeInfo5 = TypeInformation.of(classOf[(Int, Long, Double, > IndexedSeq[String])]) > implicit val typeInfo6 = TypeInformation.of(classOf[(String, Int, Long, > Double)]) > implicit val typeInfo7 = TypeInformation.of(classOf[(Int, Long, Double, > IndexedSeq[String], Int)]) > implicit val typeInfo8 = TypeInformation.of(classOf[(Int, Long, Double, > String, Int)]) > > > The code now compiles fine, but I now have a problem with the code below, > which was working perfectly fine before I added the above and made the > IndexedSeq refactor > > val readings: DataStream[(Int, Long, Double, Int)] = stream > .flatMap(new splitReadings()) > .setParallelism(1) > .assignTimestampsAndWatermarks(new readingTimstamps) > .setParallelism(1) > > > val maxChannelScaled: DataStream[(Int, Long, Double, Double)] = readings > .keyBy(0) > .countWindow(runmaxWinLen, 1) > .process(new runningMax()) > .setParallelism(2 * env.getParallelism) > > > When I submit the job I find the following in the log > > 2019-12-13 15:37:35,600 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > scala.Tuple4 does not contain a setter for field _1 > 2019-12-13 15:37:35,601 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - Class class > scala.Tuple4 cannot be used as a POJO type because not all fields are valid > POJO fields, and must be processed as GenericType. Please read the Flink > documentation on "Data Types & Serialization" for details of the effect on > performance. > 2019-12-13 15:37:35,602 ERROR > org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler - Unhandled > exception. > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Specifying keys via field positions is only valid > for tuple data types. Type: GenericType<scala.Tuple4> > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) > at > org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:100) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.api.common.InvalidProgramException: Specifying > keys via field positions is only valid for tuple data types. Type: > GenericType<scala.Tuple4> > at > org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:232) > at > org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:223) > at > org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:321) > at > org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:392) > at org.example.Job$.main(Job.scala:99) > at org.example.Job.main(Job.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > ... 9 more > > What is happening, and more importantly how can I fix the problem? > > TIA > > Nick >