Hi Dong, > On Jan 4, 2024, at 10:18 PM, Dong Lin <lindon...@gmail.com> wrote: > > Hi Ken, > > Sorry for the late reply. I didn't notice this email from you until now. > > In this scenario you described above, I don't think operator2 will see the > result modified by operato1. Note that object re-use applies only to the > transmission of data between operators in the same operator chain. But Flink > won't put StreamX, operator1 and operator2 in the same operator chain when > both operator1 and operator2 reads the same output from StreamX. > > Would this answer your question?
Actually operator2 will see the modified result. The test case below illustrates this. It will fail when object reuse is enabled. — Ken package com.scaleunlimited.flinksnippets; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.util.CloseableIterator; import static org.junit.Assert.*; import org.junit.Test; public class ObjectReuseTest { @Test public void testObjectReuse() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); // env.getConfig().enableObjectReuse(); DataStream<Event> stream1 = env.fromElements( new Event("A", 1)); stream1.map((Event r) -> { r.setValue(r.getValue() * 2); return r; }) .addSink(new DiscardingSink<>()); DataStream<Event> stream2 = stream1.map(r -> r); CloseableIterator<Event> results = stream2.collectAsync(); env.execute(); assertTrue(results.hasNext()); Event result = results.next(); assertEquals(1, result.getValue()); assertFalse(results.hasNext()); } public static class Event { private String label; private long value; public Event() {} public Event(String label, long value) { this.label = label; this.value = value; } public String getLabel() { return label; } public void setLabel(String label) { this.label = label; } public long getValue() { return value; } public void setValue(long value) { this.value = value; } } } > > > > On Fri, Oct 20, 2023 at 7:26 AM Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>> wrote: >> Hi Dong, >> >> Sorry for not seeing this initially. I did have one question about the >> description of the issue in the FLIP: >> >>> However, in cases where the upstream and downstream operators do not store >>> or access references to the input or output records, this deep-copy >>> overhead becomes unnecessary >> >> I was interested in getting clarification as to what you meant by “or access >> references…”, to see if it covered this situation: >> >> StreamX —forward--> operator1 >> StreamX —forward--> operator2 >> >> If operator1 modifies the record, and object re-use is enabled, then >> operator2 will see the modified version, right? >> >> Thanks, >> >> — Ken >> >>> On Jul 2, 2023, at 7:24 PM, Xuannan Su <suxuanna...@gmail.com >>> <mailto:suxuanna...@gmail.com>> wrote: >>> >>> Hi all, >>> >>> Dong(cc'ed) and I are opening this thread to discuss our proposal to >>> add operator attribute to allow operator to specify support for >>> object-reuse [1]. >>> >>> Currently, the default configuration for pipeline.object-reuse is set >>> to false to avoid data corruption, which can result in suboptimal >>> performance. We propose adding APIs that operators can utilize to >>> inform the Flink runtime whether it is safe to reuse the emitted >>> records. This enhancement would enable Flink to maximize its >>> performance using the default configuration. >>> >>> Please refer to the FLIP document for more details about the proposed >>> design and implementation. We welcome any feedback and opinions on >>> this proposal. >>> >>> Best regards, >>> >>> Dong and Xuannan >>> >>> [1] >>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749 >> >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> Custom big data solutions >> Flink & Pinot >> >> >> -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink & Pinot