Absolutely, I think it's gone through quite a few iterations, but this is the current state of it (defined in a @Before function as part of scaffolding out the tests):
private lateinit var magicWindowHarness: KeyedOneInputStreamOperatorTestHarness<String, Log, FileOutput> @Before fun init() { magicWindowHarness = ProcessFunctionTestHarnesses.forKeyedProcessFunction( MagicWindowFunction(), { log -> log.getKey() }, TypeInformation.of(String::class.java) ) } I've also tried a few variants of that with a separate declaration for the function itself, etc. On Thu, Mar 4, 2021 at 6:47 AM Chesnay Schepler <ches...@apache.org> wrote: > Could you show us how you create test harness? > > On 3/4/2021 5:13 AM, Rion Williams wrote: > > Hi all, > > Early today I had asked a few questions regarding the use of the many > testing constructs available within Flink and believe that I have things in > a good direction at present. I did run into a specific case that either may > not be supported, or just isn't documented well enough for me to determine > what is going wrong. > > Basically, I have a KeyedProcessFunction that reads some global-level > configuration via GlobalJobParameters within its open function: > > override fun open(configuration: Configuration) { > // Omitted for brevity > > val parameters = runtimeContext.executionConfig.globalJobParameters > as? ParameterTool > if (parameters != null) { > processingTimeBuffer = parameters.getLong("processingTimeBuffer", > 0L) > } > } > > This works just as expected within the actual pipeline itself when set > similarly: > > streamEnvironment.config.globalJobParameters = parameters > > However, I don't see an effective way to set this against a TestHarness as > I've made several attempts but I never can seem to populate the > globalJobParameters property within the KeyedProcessFunction itself using a > test harness despite multiple attempts > > // Attempt 1 > magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters > = ParameterTool.fromMap(...) > > // Attempt 2 > magicWindowHarness.executionConfig.globalJobParameters = > ParameterTool.fromMap(...) > > // Attempt 3 > magicWindowHarness.environment.executionConfig.globalJobParameters = > ParameterTool.fromMap(...) > > // Attempt 4 > val env = StreamExecutionEnvironment. > env.config.globalJobParameters = ParameterTool.fromMap(...) > > Is this supported or am I simply going about it the wrong way? Or even > just perhaps missing a piece of the puzzle? > > Thanks much, > > Rion > > >