Absolutely,

I think it's gone through quite a few iterations, but this is the current
state of it (defined in a @Before function as part of scaffolding out the
tests):

private lateinit var magicWindowHarness:
KeyedOneInputStreamOperatorTestHarness<String, Log, FileOutput>

@Before
fun init() {
    magicWindowHarness =
ProcessFunctionTestHarnesses.forKeyedProcessFunction(
        MagicWindowFunction(),
        { log -> log.getKey() },
        TypeInformation.of(String::class.java)
    )
}

I've also tried a few variants of that with a separate declaration for the
function itself, etc.



On Thu, Mar 4, 2021 at 6:47 AM Chesnay Schepler <ches...@apache.org> wrote:

> Could you show us how you create test harness?
>
> On 3/4/2021 5:13 AM, Rion Williams wrote:
>
> Hi all,
>
> Early today I had asked a few questions regarding the use of the many
> testing constructs available within Flink and believe that I have things in
> a good direction at present. I did run into a specific case that either may
> not be supported, or just isn't documented well enough for me to determine
> what is going wrong.
>
> Basically, I have a KeyedProcessFunction that reads some global-level
> configuration via GlobalJobParameters within its open function:
>
> override fun open(configuration: Configuration) {
>     // Omitted for brevity
>
>     val parameters = runtimeContext.executionConfig.globalJobParameters
> as? ParameterTool
>     if (parameters != null) {
>         processingTimeBuffer = parameters.getLong("processingTimeBuffer",
> 0L)
>     }
> }
>
> This works just as expected within the actual pipeline itself when set
> similarly:
>
> streamEnvironment.config.globalJobParameters = parameters
>
> However, I don't see an effective way to set this against a TestHarness as
> I've made several attempts but I never can seem to populate the
> globalJobParameters property within the KeyedProcessFunction itself using a
> test harness despite multiple attempts
>
> // Attempt 1
> magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
> = ParameterTool.fromMap(...)
>
> // Attempt 2
> magicWindowHarness.executionConfig.globalJobParameters =
> ParameterTool.fromMap(...)
>
> // Attempt 3
> magicWindowHarness.environment.executionConfig.globalJobParameters =
> ParameterTool.fromMap(...)
>
> // Attempt 4
> val env = StreamExecutionEnvironment.
> env.config.globalJobParameters = ParameterTool.fromMap(...)
>
> Is this supported or am I simply going about it the wrong way? Or even
> just perhaps missing a piece of the puzzle?
>
> Thanks much,
>
> Rion
>
>
>

Reply via email to