Hi,

You POJO should implement the Serializable interface.
Otherwise it's not considered to be serializable.

Best, Fabian

Papadopoulos, Konstantinos <konstantinos.papadopou...@iriworldwide.com>
schrieb am Mi., 3. Apr. 2019, 07:22:

> Hi Chesnay,
>
>
>
> Thanks for your support. ThresholdAcvFact class is a simple POJO with the
> following definition:
>
>
>
> public class ThresholdAcvFact {
>
>
>
>     private Long timePeriodId;
>
>     private Long geographyId;
>
>     private Long productId;
>
>     private Long customerId;
>
>     private Double basePrice;
>
>     private Double promoPrice;
>
>     private Double basePriceAcv;
>
>     private Double promoPriceAcv;
>
>     private Long count;
>
>
>
>     public Long getTimePeriodId() {
>
>         return timePeriodId;
>
>     }
>
>
>
>     public void setTimePeriodId(Long timePeriodId) {
>
>         this.timePeriodId = timePeriodId;
>
>     }
>
>
>
>     public Long getGeographyId() {
>
>         return geographyId;
>
>     }
>
>
>
>     public void setGeographyId(Long geographyId) {
>
>         this.geographyId = geographyId;
>
>     }
>
>
>
>     public Long getProductId() {
>
>         return productId;
>
>     }
>
>
>
>     public void setProductId(Long productId) {
>
>         this.productId = productId;
>
>     }
>
>
>
>     public Long getCustomerId() {
>
>         return customerId;
>
>     }
>
>
>
>     public void setCustomerId(Long customerId) {
>
>         this.customerId = customerId;
>
>     }
>
>
>
>     public Double getBasePrice() {
>
>         return basePrice;
>
>     }
>
>
>
>     public void setBasePrice(Double basePrice) {
>
>         this.basePrice = basePrice;
>
>     }
>
>
>
>     public Double getPromoPrice() {
>
>         return promoPrice;
>
>     }
>
>
>
>     public void setPromoPrice(Double promoPrice) {
>
>         this.promoPrice = promoPrice;
>
>     }
>
>
>
>     public Double getBasePriceAcv() {
>
>         return basePriceAcv;
>
>     }
>
>
>
>     public void setBasePriceAcv(Double basePriceAcv) {
>
>         this.basePriceAcv = basePriceAcv;
>
>     }
>
>
>
>     public Double getPromoPriceAcv() {
>
>         return promoPriceAcv;
>
>     }
>
>
>
>     public void setPromoPriceAcv(Double promoPriceAcv) {
>
>         this.promoPriceAcv = promoPriceAcv;
>
>     }
>
>
>
>     public Long getCount() {
>
>         return count;
>
>     }
>
>
>
>     public void setCount(Long count) {
>
>         this.count = count;
>
>     }
>
>
>
>     @Override
>
>     public String toString() {
>
>         return "ThresholdAcvFact{" +
>
>                 "timePeriodId=" + timePeriodId +
>
>                 ", geographyId=" + geographyId +
>
>                 ", productId=" + productId +
>
>                 ", customerId=" + customerId +
>
>                 ", basePrice=" + basePrice +
>
>                 ", promoPrice=" + promoPrice +
>
>                 ", basePriceAcv=" + basePriceAcv +
>
>                 ", promoPriceAcv=" + promoPriceAcv +
>
>                 ", count=" + count +
>
>                 '}';
>
>     }
>
>
>
> }
>
>
>
> While the implementation of the function we faced the issue reported is
> the following:
>
>
>
> public DataSet<ThresholdAcvFact> transform(ThresholdAcvCalcSources
> thresholdAcvCalcSources, Long customerId) {
>
>
>
>         final DataSet<ThresholdAcvFact> basePriceFacts = getPriceFacts(
>
>                 thresholdAcvCalcSources.getBasePriceDataSet(),
> thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),
>
>                 new ThresholdAcvBasePriceFactMapper(customerId));
>
>
>
>         final DataSet<ThresholdAcvFact> promoPriceFacts = getPriceFacts(
>
>                 thresholdAcvCalcSources.getPromoPriceDataSet(),
> thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),
>
>                 new ThresholdAcvPromoPriceFactMapper(customerId));
>
>
>
>         return basePriceFacts
>
>                 .fullOuterJoin(promoPriceFacts)
>
>                 .where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD,
> GEOGRAPHY_ID, "basePrice")
>
>                 .equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD,
> GEOGRAPHY_ID, "promoPrice")
>
>                 .with(new ThresholdAcvFactBasePromoPriceJoiner())
>
>                 .groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD,
> GEOGRAPHY_ID)
>
> .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {
>
>     @Override
>     public Double getKey(ThresholdAcvFact thresholdAcvFact) throws
> Exception {
>
>       return
> Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
>
>
>      }
>   }, Order.*ASCENDING*)
>
>                 .reduceGroup(new ThresholdAcvFactCountGroupReducer());
>
>
>
>     }
>
>
>
> Regards,
>
> Konstantinos
>
>
>
> *From:* Chesnay Schepler <ches...@apache.org>
> *Sent:* Τετάρτη, 3 Απριλίου 2019 12:59 μμ
> *To:* Papadopoulos, Konstantinos
> <konstantinos.papadopou...@iriworldwide.com>; user@flink.apache.org
> *Subject:* Re: InvalidProgramException when trying to sort a group within
> a dataset
>
>
>
> Your user-defined functions are referencing the class
> "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't
> serializable.
>
> My guess is that "ThresholdAcvFact" is a non-static inner class, however I
> would need to see the entire class to give an accurate analysis.
>
> On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote:
>
> Hi all,
>
>
>
> I am trying to sort a group within a dataset using KeySelector as follows:
>
>
>
> in
>
>   .groupBy(“productId”, “timePeriodId”, “geographyId”)
>
>   .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {
>
>     @Override
>     public Double getKey(ThresholdAcvFact thresholdAcvFact) throws
> Exception {
>
>       return
> Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
>
>
>      }
>   }, Order.*ASCENDING*)
>
>   .reduceGroup(/* do something */)
>
>
>
> And I am getting the following exception:
>
>
>
> org.apache.flink.api.common.InvalidProgramException: KeySelector
> group-sorting keys can only be used with KeySelector grouping keys.
>
>
>
>      at
> org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)
>
>      at
> com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)
>
>      at
> com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)
>
>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>      at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>      at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>      at java.lang.reflect.Method.invoke(Method.java:498)
>
>      at
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
>
>      at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
>
>      at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
>
>      at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
>
>      at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
>
>      at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
>
>      at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
>
>      at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
>
>      at java.util.ArrayList.forEach(ArrayList.java:1257)
>
>      at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
>
>      at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
>
>      at java.util.ArrayList.forEach(ArrayList.java:1257)
>
>      at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
>
>      at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
>
>      at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
>
>      at
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>
>      at
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
>
>      at
> com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
>
>      at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>
>      at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>
>      at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
>
>
>
> Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’
> transformations as follows:
>
>
>
> in
>
>   .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {
>
>
>
>     @Override
>
>     public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact
> thresholdAcvFact) throws Exception {
>
>       return  new Tuple3<>(thresholdAcvFact.getProductId(),
> thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());
>
>     }
>
>   })
>
>   .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {
>
>     @Override
>     public Double getKey(ThresholdAcvFact thresholdAcvFact) throws
> Exception {
>
>       return
> Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
>
>
>      }
>   }, Order.*ASCENDING*)
>
>   .reduceGroup(/* do something */)
>
>
>
> The job execution still failed with the following exception:
>
>
>
> org.apache.flink.optimizer.CompilerException: Error translating node 'Map
> "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED]
> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could
> not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException:
> com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl
>
>
>
>      at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)
>
>      at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)
>
>      at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
>
>      at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>
>      at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>
>      at
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
>
>      at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)
>
>      at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)
>
>      at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>
>      at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
>
>      at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>
>      at
> com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)
>
>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>      at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>      at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>      at java.lang.reflect.Method.invoke(Method.java:498)
>
>      at
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
>
>      at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
>
>      at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
>
>      at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
>
>      at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
>
>      at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
>
>      at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
>
>      at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
>
>      at java.util.ArrayList.forEach(ArrayList.java:1257)
>
>      at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
>
>      at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
>
>      at java.util.ArrayList.forEach(ArrayList.java:1257)
>
>      at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
>
>      at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
>
>      at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
>
>      at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
>
>      at
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>
>      at
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
>
>      at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
>
>      at
> com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
>
>      at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>
>      at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>
>      at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
>
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException:
> com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl
>
>      at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)
>
>      at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)
>
>      at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)
>
>      ... 50 more
>
> Caused by: java.io.NotSerializableException:
> com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl
>
>      at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>
>      at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
>      at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
>      at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
>      at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
>      at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
>      at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
>      at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
>      at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
>      at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
>      at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
>      at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
>      at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
>      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>
>      at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
>
>      at
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)
>
>      at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)
>
>      ... 52 more
>
>
>
> Does anyone have any idea how I can surpass such issues?
>
>
>
> Thanks in advance
>
>
>
>
>
>
>
>
>

Reply via email to