Hi,
I am looking for reviews and suggestions, regarding side input in ULR [1]. We have planned steps described in this doc [2]. A draft of PR, for the first step, is created. Would like to ask for suggestions on [1] (maybe [2] as well). Figured maybe getting some feedback would be of great help, in case I go too far with something less preferred. What this PR [1] has done (limited to Step One): 1. Setup and pass stateAPI server description, and contexts, correctly. 2. Creates a SideInput Handler (by borrowing ideas from Flink implementation) that does the KV look up. This basically implements a skeleton of SideInputHandler without wiring to ULR’s job graphs. 3. When runner sends back a constant integer, I can see data flowing correctly, up to the point of encoding happens. The example pipeline I use is a WordCount, with an integer sideinput added. What this PR [1] does not do yet (but otherwise should be, to complete Step One): 1. When runner sends back a constant Integer (1), there is an error during data encoding: “”” Caused by: java.lang.IllegalStateException: java.lang.ClassCastException: java.lang.Integer cannot be cast to [B at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) at org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandlers$StateRequestHandlerToSideInputHandlerFactoryAdapter.handleGetRequest(StateRequestHandlers.java:300) at org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandlers$StateRequestHandlerToSideInputHandlerFactoryAdapter.handle(StateRequestHandlers.java:266) at org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandlers$StateKeyTypeDelegatingStateRequestHandler.handle(StateRequestHandlers.java:205) “”” I am still trying to understand why this coder throws CastException, please suggest if I did something wrong at high level. [1] https://issues.apache.org/jira/browse/BEAM-2928 [2] http://bit.ly/2EbqCKd Thanks! -- ================ Ruoyun Huang
