Hi Dong,

> On Jan 4, 2024, at 10:18 PM, Dong Lin <lindon...@gmail.com> wrote:
> 
> Hi Ken,
> 
> Sorry for the late reply. I didn't notice this email from you until now.
> 
> In this scenario you described above, I don't think operator2 will see the 
> result modified by operato1. Note that object re-use applies only to the 
> transmission of data between operators in the same operator chain. But Flink 
> won't put StreamX, operator1 and operator2 in the same operator chain when 
> both operator1 and operator2 reads the same output from StreamX.
> 
> Would this answer your question?


Actually operator2 will see the modified result.

The test case below illustrates this. It will fail when object reuse is enabled.

— Ken

package com.scaleunlimited.flinksnippets;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.util.CloseableIterator;
import static org.junit.Assert.*;
import org.junit.Test;

public class ObjectReuseTest {
    
    @Test
    public void testObjectReuse() throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
        // env.getConfig().enableObjectReuse();
        
        DataStream<Event> stream1 = env.fromElements(
                new Event("A", 1));
        
        stream1.map((Event r) -> {
                r.setValue(r.getValue() * 2);
                return r;
            })
            .addSink(new DiscardingSink<>());
        
        DataStream<Event> stream2 = stream1.map(r -> r);
        
        CloseableIterator<Event> results = stream2.collectAsync();
               
        env.execute();
        
        assertTrue(results.hasNext());
        Event result = results.next();
        assertEquals(1, result.getValue());
        assertFalse(results.hasNext());
    }
    
    public static class Event {
        private String label;
        private long value;
        
        public Event() {}
        
        public Event(String label, long value) {
            this.label = label;
            this.value = value;
        }

        public String getLabel() {
            return label;
        }

        public void setLabel(String label) {
            this.label = label;
        }

        public long getValue() {
            return value;
        }

        public void setValue(long value) {
            this.value = value;
        }
    }
    
}


> 
>  
> 
> On Fri, Oct 20, 2023 at 7:26 AM Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>> wrote:
>> Hi Dong,
>> 
>> Sorry for not seeing this initially. I did have one question about the 
>> description of the issue in the FLIP:
>> 
>>> However, in cases where the upstream and downstream operators do not store 
>>> or access references to the input or output records, this deep-copy 
>>> overhead becomes unnecessary 
>> 
>> I was interested in getting clarification as to what you meant by “or access 
>> references…”, to see if it covered this situation:
>> 
>> StreamX —forward--> operator1
>> StreamX —forward--> operator2
>> 
>> If operator1 modifies the record, and object re-use is enabled, then 
>> operator2 will see the modified version, right?
>> 
>> Thanks,
>> 
>> — Ken
>> 
>>> On Jul 2, 2023, at 7:24 PM, Xuannan Su <suxuanna...@gmail.com 
>>> <mailto:suxuanna...@gmail.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> Dong(cc'ed) and I are opening this thread to discuss our proposal to
>>> add operator attribute to allow operator to specify support for
>>> object-reuse [1].
>>> 
>>> Currently, the default configuration for pipeline.object-reuse is set
>>> to false to avoid data corruption, which can result in suboptimal
>>> performance. We propose adding APIs that operators can utilize to
>>> inform the Flink runtime whether it is safe to reuse the emitted
>>> records. This enhancement would enable Flink to maximize its
>>> performance using the default configuration.
>>> 
>>> Please refer to the FLIP document for more details about the proposed
>>> design and implementation. We welcome any feedback and opinions on
>>> this proposal.
>>> 
>>> Best regards,
>>> 
>>> Dong and Xuannan
>>> 
>>> [1] 
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>> 
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink & Pinot
>> 
>> 
>> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink & Pinot



Reply via email to