Re: Re: How to check checkpointing mode
Hi Yun, It is confusing but UI now shows expected value "At Least Once" (obviously checkpointCfg#checkpointingMode shows AT_LEAST_ONCE as well). Clearly I've either looked in wrong place or job was not upgraded when I changed checkpointing mode ... Sorry for noise and thank you for your help Alexey From: Yun Gao Sent: Monday, March 8, 2021 7:14 PM To: Alexey Trenikhun ; Flink User Mail List Subject: Re: Re: How to check checkpointing mode Hi Alexey, Sorry I also do not see problems in the attached code. Could you add a breakpoint at `see.execute(name)` and have a look at the value of see#checkpointCfg#checkpointingMode ? Best, Yun --Original Mail -- Sender:Alexey Trenikhun Send Date:Tue Mar 9 07:25:31 2021 Recipients:Flink User Mail List , Yun Gao Subject:Re: How to check checkpointing mode Hi Yun, Thank you for looking, job creation is quite big, I've truncated helper methods dealing with command line parameters etc, below two major methods: @Override public Void call() throws Exception { LOGGER.info("{}", new Info().toLog()); if (!allParameters.isEmpty()) { // We don't expect any parameters, but Flink 1.12 adds JVM options to job args, since we add // -- after jobs argument, this unnecessary for us arguments will be treated as positional // parameters, which we ignore but log warning LOGGER.warn("Unexpected parameters: {}", allParameters); } try { final StreamExecutionEnvironment see = buildStreamExecutionEnvironment(); see.execute(name); return null; } catch (InterruptedException e) { LOGGER.error("Stream Processor was interrupted", e); Thread.currentThread().interrupt(); throw e; } catch (Exception e) { LOGGER.error("Stream Processor is terminated due to exception", e); throw e; } } private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws IOException { initDefaultKafkaSource(); final long deviationMillis = deviation.toMillis(); final GlobalAppConfig globalAppConfig = config(); final StreamExecutionEnvironment see = StreamExecutionEnvironment .getExecutionEnvironment() .enableCheckpointing(checkpointInterval.toMillis(), CheckpointingMode.AT_LEAST_ONCE) .setMaxParallelism(1024) .setParallelism(parallelism); if (externalizedCheckpoints) { see.getCheckpointConfig() .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } see.getConfig().disableGenericTypes(); see.getConfig().disableAutoGeneratedUIDs(); configureStateBackend(see); final Properties producerProperties = new PropertiesBuilder() .putAll(kafkaCommonOptions) .putAll(kafkaProducerOptions) .varFiles(valueFiles) .build(); final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder() .semantic(Semantic.AT_LEAST_ONCE) .config(producerProperties) .build(); final AutoTopic autoTopic = AutoTopic.builder() .config(producerProperties) .partitions(autoCreateTopicsPartitions) .replicationFactor(autoCreateTopicsReplicationFactor) .doNotCreateTopics(ImmutableSet.of( gspCfg, gspCustom, gspIxn, gspOutbound, gspSm )) .build(); see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.minutes(1))); // since Flink 1.12 default stream characteristic is event time, // so we don't need to set streamTimeCharacteristic, furthermore whole TimeCharacteristic enum // is deprecated. // If needed explicitly using processing-time windows and timers works in event-time mode. addHeartbeats(see); final TStateCleanupOnTimeout.Factory cleanupFactory = new TStateCleanupOnTimeout.Factory( maxCallDuration, postmortemCallDuration, globalAppConfig.timerGranularity() ); @Nullable final SingleOutputStreamOperator cfgXform; @Nullable final DataStream cfgSource = addSources(see, SourceTopic.GCA_CFG, new CfgJsonDeserializationSchema(), (event, timestamp) -> event.getBatchId(), it -> !it.getHeartbeat()); if (cfgSource != null) { cfgXform = cfgSource .keyBy(PbCfgDatum::getCcId) .process(new CfgTransform()) .uid("xform-cfg") .name("XForm Config"); if (!isNullOrEmpty(gspCfg)) { cfgXform.addSink(producerFactory.create(gspCfg, autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg .uid("uid-" + gspCfg) .name(gspCfg); } else { cfgXform.addSink(new DiscardingSink<>()) .uid("uid-gsp-cfg-null") .name("gsp-cfg-null"); } } else { cfgXform = null; } final DataStream voiceCallThreadSource = addSources(see, SourceTopic.VOICE_CALL_THREAD, callThreadFormat
Re: Re: How to check checkpointing mode
tionsProcessFunction(globalAppConfig, cleanupFactory)) .uid("xform-voice-interactions") .name("Xform Voice interactions"); if (digitalPreTransform != null) { final DataStream ixnOutputStream = digitalPreTransform .connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF)) .keyBy(DixnEventKey::new, DixnEventKey::new) .process(new DixnTransform(globalAppConfig)) .uid("xform-digital-interactions") .name("Xform Digital Interactions"); interactionDataStream = interactionDataStream.union(ixnOutputStream); } interactionDataStream = AsyncDataStream.orderedWait( interactionDataStream, new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()), cfgCacheTimeout.toMillis(), TimeUnit.MILLISECONDS, (ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / see.getConfig() .getParallelism()) .uid("uid-config-async-io") .name("Enrich interactions with config"); if (!gspIxn.isEmpty()) { interactionDataStream.addSink( producerFactory.create( gspIxn, autoTopic.decorate(new InteractionSerializationSchema( outputTopicFormat, gspIxn, producerFactory.config() .uid("uid-" + gspIxn) .name(gspIxn); } else { interactionDataStream .addSink(new DiscardingSink<>()) .uid("uid-gsp-ixn-null") .name("gsp-ixn-null"); } if (!isNullOrEmpty(gspCustom)) { final DataStream uxEventsVoice = tcmDataStream1 .getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG); final SingleOutputStreamOperator customFactsVoice = uxEventsVoice .keyBy(UxKey::new) .process(new UxToCustomFact(globalAppConfig)) .uid("xform-voice-ux") .name("Xform Voice Ux"); SingleOutputStreamOperator customFactsDigital = null; if (sortedIxnEventStream != null) { DataStream uxEventsDigital = sortedIxnEventStream .getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG); customFactsDigital = uxEventsDigital .keyBy(UxKey::new) .process(new UxToCustomFact(globalAppConfig)) .uid("xform-digital-ux") .name("Xform Digital Ux"); } final DataStream customFacts = customFactsDigital == null ? customFactsVoice : customFactsVoice.union(customFactsDigital); customFacts.addSink( producerFactory.create( globalAppConfig.gspCustom(), autoTopic.decorate( new CustomFactSerializationSchema( outputTopicFormat, globalAppConfig.gspCustom() .uid("uid-" + globalAppConfig.gspCustom()) .name(globalAppConfig.gspCustom()); } if (!isNullOrEmpty(voiceOutbound)) { final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory = new OcsStateCleanupOnTimeout.Factory( maxCampaignGroupSessionDuration, globalAppConfig.timerGranularity() ); @Nullable final DataStream ocsAgentDetails = agentInfo != null ? agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : null; final SingleOutputStreamOperator outboundDataStream = ocsDataStream .connect(ocsAgentDetails) .keyBy(OcsEventKey::new, OcsEventKey::new) .process(new OcsProcessFunction(globalAppConfig, ocsStateCleanupFactory)) .uid("xform-outbound") .name("Xform Outbound"); if (!gspOutbound.isEmpty()) { outboundDataStream.addSink( producerFactory.create( gspOutbound, autoTopic.decorate( new OutboundSerializationSchema(outputTopicFormat, gspOutbound .uid("uid-" + globalAppConfig.gspOutbound()) .name(globalAppConfig.gspOutbound()); } else { outboundDataStream .addSink(new DiscardingSink<>()) .uid("uid-gsp-outbound-null") .name("gsp-outbound-null"); } } TApp.main(see); return see; } Thanks, Alexey From: Yun Gao Sent: Monday, March 8, 2021 7:57 AM To: Alexey Trenikhun ; Flink User Mail List Subject: Re: How to check checkpointing mode Hi Alexey, Logically the setting in the code is of the highest priority. Could you show the complete code on the job creation ? I think it seems to be not usual to enable checkpointing with an anonymous StreamExecutionEnvironment. Best, Yun -- From:Alexey Trenikhun Send Time:2021 Mar. 6 (Sat.) 01:02 To:Flink User Mail List Subject:How to check checkpointing mode Hello, My job sets checkpointing mode to at-least-once: StreamExecutionEnvironment .getExecutionEnvironment() .enableCheckpointing(checkpointInterval.toMillis(), CheckpointingMode.AT_LEAST_ONCE) but Flink UI shows Checkpointing Mode: Exactly Once: Why is that? Does Flink for some reason decide to ignore my setting (btw flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any other way to check what is actual checkpointing mode is? Thanks, Alexey