Re: Re: How to check checkpointing mode

2021-03-09 Thread Alexey Trenikhun
Hi Yun,
It is confusing but UI now shows expected value "At Least Once" (obviously 
checkpointCfg#checkpointingMode shows AT_LEAST_ONCE as well). Clearly I've 
either looked in wrong place or job was not upgraded when I changed 
checkpointing mode ...

Sorry for noise and thank you for your help

Alexey


From: Yun Gao 
Sent: Monday, March 8, 2021 7:14 PM
To: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: Re: How to check checkpointing mode

Hi Alexey,

Sorry I also do not see problems in the attached code. Could you add
a breakpoint at `see.execute(name)` and have a look at the value of
see#checkpointCfg#checkpointingMode ?

Best,
Yun

--Original Mail --
Sender:Alexey Trenikhun 
Send Date:Tue Mar 9 07:25:31 2021
Recipients:Flink User Mail List , Yun Gao 

Subject:Re: How to check checkpointing mode
Hi Yun,
Thank you for looking, job creation is quite big, I've truncated helper methods 
dealing with command line parameters etc, below two major methods:


@Override

public Void call() throws Exception {
  LOGGER.info("{}", new Info().toLog());


  if (!allParameters.isEmpty()) {
// We don't expect any parameters, but Flink 1.12 adds JVM options to job 
args, since we add
// -- after jobs argument, this unnecessary for us arguments will be 
treated as positional
// parameters, which we ignore but log warning
LOGGER.warn("Unexpected parameters: {}", allParameters);
  }
  try {
final StreamExecutionEnvironment see = buildStreamExecutionEnvironment();
see.execute(name);
return null;
  } catch (InterruptedException e) {
LOGGER.error("Stream Processor was interrupted", e);
Thread.currentThread().interrupt();
throw e;
  } catch (Exception e) {
LOGGER.error("Stream Processor is terminated due to exception", e);
throw e;
  }
}


private StreamExecutionEnvironment buildStreamExecutionEnvironment() throws 
IOException {
  initDefaultKafkaSource();
  final long deviationMillis = deviation.toMillis();
  final GlobalAppConfig globalAppConfig = config();
  final StreamExecutionEnvironment see = StreamExecutionEnvironment
  .getExecutionEnvironment()
  .enableCheckpointing(checkpointInterval.toMillis(),
  CheckpointingMode.AT_LEAST_ONCE)
  .setMaxParallelism(1024)
  .setParallelism(parallelism);
  if (externalizedCheckpoints) {
see.getCheckpointConfig()

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  }
  see.getConfig().disableGenericTypes();
  see.getConfig().disableAutoGeneratedUIDs();
  configureStateBackend(see);

  final Properties producerProperties = new PropertiesBuilder()
  .putAll(kafkaCommonOptions)
  .putAll(kafkaProducerOptions)
  .varFiles(valueFiles)
  .build();

  final KafkaProducerFactory producerFactory = KafkaProducerFactory.builder()
  .semantic(Semantic.AT_LEAST_ONCE)
  .config(producerProperties)
  .build();

  final AutoTopic autoTopic = AutoTopic.builder()
  .config(producerProperties)
  .partitions(autoCreateTopicsPartitions)
  .replicationFactor(autoCreateTopicsReplicationFactor)
  .doNotCreateTopics(ImmutableSet.of(
  gspCfg, gspCustom, gspIxn, gspOutbound, gspSm
  ))
  .build();

  see.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 
Time.minutes(1)));
  // since Flink 1.12 default stream characteristic is event time,
  // so we don't need to set streamTimeCharacteristic, furthermore whole 
TimeCharacteristic enum
  // is deprecated.
  // If needed explicitly using processing-time windows and timers works in 
event-time mode.

  addHeartbeats(see);
  final TStateCleanupOnTimeout.Factory cleanupFactory =
  new TStateCleanupOnTimeout.Factory(
  maxCallDuration,
  postmortemCallDuration,
  globalAppConfig.timerGranularity()
  );

  @Nullable final SingleOutputStreamOperator cfgXform;
  @Nullable final DataStream cfgSource = addSources(see,
  SourceTopic.GCA_CFG,
  new CfgJsonDeserializationSchema(),
  (event, timestamp) -> event.getBatchId(),
  it -> !it.getHeartbeat());

  if (cfgSource != null) {
cfgXform = cfgSource
.keyBy(PbCfgDatum::getCcId)
.process(new CfgTransform())
.uid("xform-cfg")
.name("XForm Config");

if (!isNullOrEmpty(gspCfg)) {
  cfgXform.addSink(producerFactory.create(gspCfg,
  autoTopic.decorate(new CfgJsonSerializationSchema(gspCfg
  .uid("uid-" + gspCfg)
  .name(gspCfg);
} else {
  cfgXform.addSink(new DiscardingSink<>())
  .uid("uid-gsp-cfg-null")
  .name("gsp-cfg-null");
}
  } else {
cfgXform = null;
  }

  final DataStream voiceCallThreadSource = addSources(see,
  SourceTopic.VOICE_CALL_THREAD,
  callThreadFormat 

Re: Re: How to check checkpointing mode

2021-03-08 Thread Yun Gao
tionsProcessFunction(globalAppConfig, 
cleanupFactory))
  .uid("xform-voice-interactions")
  .name("Xform Voice interactions");

  if (digitalPreTransform != null) {
final DataStream ixnOutputStream = digitalPreTransform

.connect(allAgentLoginSession.getSideOutput(AgentStateTransformation.DIGITAL_IRF))
.keyBy(DixnEventKey::new, DixnEventKey::new)
.process(new DixnTransform(globalAppConfig))
.uid("xform-digital-interactions")
.name("Xform Digital Interactions");
interactionDataStream = interactionDataStream.union(ixnOutputStream);
  }

  interactionDataStream = AsyncDataStream.orderedWait(
  interactionDataStream,
  new ResourceInfo(cfgCacheUrl, provideCfgDictionarySpec()),
  cfgCacheTimeout.toMillis(),
  TimeUnit.MILLISECONDS,
  (ResourceInfo.IO_CAPACITY + see.getConfig().getParallelism() - 1) / 
see.getConfig()
  .getParallelism())
  .uid("uid-config-async-io")
  .name("Enrich interactions with config");

  if (!gspIxn.isEmpty()) {
interactionDataStream.addSink(
producerFactory.create(
gspIxn,
autoTopic.decorate(new InteractionSerializationSchema(
outputTopicFormat, gspIxn, producerFactory.config()
.uid("uid-" + gspIxn)
.name(gspIxn);
  } else {
interactionDataStream
.addSink(new DiscardingSink<>())
.uid("uid-gsp-ixn-null")
.name("gsp-ixn-null");
  }

  if (!isNullOrEmpty(gspCustom)) {
final DataStream uxEventsVoice = tcmDataStream1
.getSideOutput(TIntakeProcessFunction.UX_EVENTS_TAG);
final SingleOutputStreamOperator customFactsVoice =
uxEventsVoice
.keyBy(UxKey::new)
.process(new UxToCustomFact(globalAppConfig))
.uid("xform-voice-ux")
.name("Xform Voice Ux");

SingleOutputStreamOperator customFactsDigital = null;
if (sortedIxnEventStream != null) {
  DataStream uxEventsDigital = sortedIxnEventStream
  .getSideOutput(UnwrapEnvelopeAndSort.UX_EVENTS_TAG);
  customFactsDigital =
  uxEventsDigital
  .keyBy(UxKey::new)
  .process(new UxToCustomFact(globalAppConfig))
  .uid("xform-digital-ux")
  .name("Xform Digital Ux");
}

final DataStream customFacts = customFactsDigital == null ? 
customFactsVoice
: customFactsVoice.union(customFactsDigital);
customFacts.addSink(
producerFactory.create(
globalAppConfig.gspCustom(),
autoTopic.decorate(
new CustomFactSerializationSchema(
outputTopicFormat, globalAppConfig.gspCustom()
.uid("uid-" + globalAppConfig.gspCustom())
.name(globalAppConfig.gspCustom());
  }

  if (!isNullOrEmpty(voiceOutbound)) {
final OcsStateCleanupOnTimeout.Factory ocsStateCleanupFactory =
new OcsStateCleanupOnTimeout.Factory(
maxCampaignGroupSessionDuration,
globalAppConfig.timerGranularity()
);

@Nullable final DataStream ocsAgentDetails = agentInfo != 
null
? 
agentInfo.getSideOutput(CfgAgentInfoRegistry.CAF_SUBJECT_INFO_OUTPUT_TAG) : 
null;

final SingleOutputStreamOperator 
outboundDataStream = ocsDataStream
.connect(ocsAgentDetails)
.keyBy(OcsEventKey::new, OcsEventKey::new)
.process(new OcsProcessFunction(globalAppConfig, 
ocsStateCleanupFactory))
.uid("xform-outbound")
.name("Xform Outbound");

if (!gspOutbound.isEmpty()) {
  outboundDataStream.addSink(
  producerFactory.create(
  gspOutbound,
  autoTopic.decorate(
          new OutboundSerializationSchema(outputTopicFormat, 
gspOutbound
  .uid("uid-" + globalAppConfig.gspOutbound())
  .name(globalAppConfig.gspOutbound());
} else {
  outboundDataStream
  .addSink(new DiscardingSink<>())
  .uid("uid-gsp-outbound-null")
  .name("gsp-outbound-null");
}
  }

  TApp.main(see);
  return see;
}
Thanks,
Alexey
From: Yun Gao 
Sent: Monday, March 8, 2021 7:57 AM
To: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: How to check checkpointing mode
Hi Alexey,

Logically the setting in the code is of the highest
priority. 

Could you show the complete code on the job creation ?
I think it seems to be not usual to enable checkpointing
with an anonymous StreamExecutionEnvironment.


Best,
Yun


--
From:Alexey Trenikhun 
Send Time:2021 Mar. 6 (Sat.) 01:02
To:Flink User Mail List 
Subject:How to check checkpointing mode

Hello,

My job sets checkpointing mode to at-least-once:

StreamExecutionEnvironment
.getExecutionEnvironment()
.enableCheckpointing(checkpointInterval.toMillis(),
CheckpointingMode.AT_LEAST_ONCE)

but Flink UI shows Checkpointing Mode: Exactly Once:


Why is that? Does Flink for some reason decide to ignore my setting (btw 
flink-conf.yaml also has execution.checkpointing.mode: AT_LEAST_ONCE)? Is any 
other way to check what is actual checkpointing mode is?

Thanks,
Alexey