Hi Abhishek,
I did the same like you and tested my job with a parquet StreamingFileSink via
a snaphot. (And run afterwards a small spark job on the parquet asserting that
my flink output is correct)
Good news for you is that it is easily possible to stop the job with a
savepoint. You are already on the right tack.
After you build your MiniCluster like so:
private MiniClusterWithClientResource buildTestMiniCluster ( Configuration
flinkClusterConfig) throws Exception {
MiniClusterWithClientResource flinkTestCluster = new
MiniClusterWithClientResource(
new MiniClusterResourceConfiguration .Builder()
.setConfiguration(flinkClusterConfig)
.setNumberTaskManagers( 1 )
.setNumberSlotsPerTaskManager( 1 )
.build());
flinkTestCluster .before();
return flinkTestCluster ;
}
you can receive a detached cluster client from it:
private ClusterClient <?> getDetachedClusterClient (
MiniClusterWithClientResource flinkTestCluster) {
ClusterClient <?> flinkClient = flinkTestCluster.getClusterClient();
flinkClient .setDetached( true );
return flinkClient ;
}
Now, in your test, you build the jobgraph, and submit the job:
JobGraph jobGraph = env .getStreamGraph().getJobGraph();
JobSubmissionResult submissionResult = flinkClient .submitJob( jobGraph ,
getClass().getClassLoader());
Afterwards, you can easily stop the job with a savepoint via that client:
flinkClient .stopWithSavepoint( submissionResult .getJobID(), false ,
savepointDir .toURI().toString());
In my case, I store the checkpoint in an ignored JUnit TemporaryFolder, because
I only care about the written parquetfile, not the savepoint itself.
That's all nice. The not-so-nice-part is that you don't easily know when the
job actually processed all elements from your job and you can trigger stopping
the pipeline with the savepoint. For this purpose, I use in each of my
integration-tests-with-savepoint a public static Semaphore. As all my jobs read
from a kafka source (using com.salesforce.kafka.test.junit4.
SharedKafkaTestResource) , I have a custom KafkaDeserializationSchema extending
the default one from my job and implement isEndOfStream:
@Override
public boolean isEndOfStream ( V nextElement) {
if ( "EOF" .equals(nextElement.getId())) {
LOCK .release();
}
return false ;
}
Finally, I finish writing the testdata in my test setup with a marker message
"EOF". When that is received, I release the semaphore and the unittest thread
executes the flinkClient.stopWithSavepoint line. This works because the
minicluster runs in the same JVM and as the savepoint/checkpoint marker will
flow through the pipeline and can't overtake my prior messages anymore, so all
data will be written and I can run my assertions after the "stopWithSavepoint"
line of code as this runs synchronously.
Hope that helps.
Best regards
Theo
Von: "David Anderson" <[email protected]>
An: "Abhishek Rai" <[email protected]>
CC: "user" <[email protected]>
Gesendet: Freitag, 12. Juni 2020 20:21:07
Betreff: Re: Restore from savepoint through Java API
You can study LocalStreamingFileSinkTest [1] for an example of how to approach
this. You can use the test harnesses [2], keeping in mind that
- initializeState is called during instance creation
- the provided context indicates if state is being restored from a snapshot
- snapshot is called when taking a checkpoint
- notifyOfCompletedCheckpoint is called when a checkpoint is complete
The outline of such a test might follow this pattern:
testHarness1.setup();
testHarness1.initializeState(initState);
testHarness1.open();
// setup state to checkpoint ...
// capture snapshot
snapshot = testHarness.snapshot(checkpointId, timestamp);
// process more data, the effects of which will be lost ...
// create a new test harness initialized with the state from the snapshot
testHarness2.setup();
testHarness2.initializeState(snapshot);
testHarness2.open();
// verify the state ...
David
[1] [
https://github.com/apache/flink/blob/release-1.10/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
|
https://github.com/apache/flink/blob/release-1.10/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
]
[2] [
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
|
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators
]
On Thu, Jun 11, 2020 at 12:12 PM Abhishek Rai < [ mailto:[email protected]
| [email protected] ] > wrote:
Hello,
I'm writing a test for my custom sink function. The function is stateful and
relies on checkpoint restores for maintaining consistency with the external
system that it's writing to. For integration testing of the sink function, I
have a MiniCluster based environment inside a single JVM through which I create
my job and validate its operation.
In order to test the checkpoint restore behavior with precision, I've disabled
checkpointing and am instead using savepoints. So, my test proceeds as follows:
1. Start a job.
2. Push some data through it to the sink and to an external system.
3. Trigger a savepoint.
4. Push more data.
5. Cancel the job.
6. Restore from the savepoint captured in step 3 above.
I can't seem to find a Java API for restoring a job from a savepoint. The
approach in the documentation and other resources is to use the CLI, which is
not an option for me. Currently, I create a RemoteStreamEnvironment with
savepointRestoreSettings set, but when I run execute(), I get the following
error:
java.lang.IllegalStateException: No operators defined in streaming topology.
Cannot execute.
var savepointDir =
restClusterClient_ .triggerSavepoint(jobId , tmpdir).get() ;
assertTrue (!savepointDir.isBlank()) ;
// Cancel the job and launch a new one from the save point.
restClusterClient_ .cancel(jobId).get() ;
var restoreSettings = SavepointRestoreSettings. forPath (savepointDir) ;
var env = new RemoteStreamEnvironment(
flinkMiniCluster_ .host() ,
flinkMiniCluster_ .port() ,
null,
new String[] {} ,
null,
restoreSettings) ;
var restoredJob = env.executeAsync() ;
Separately, is there a flink testing utility I could use for integration
testing of state checkpointing and recovery?
Thanks,
Abhishek