Hi, Able to read *.gz files from an s3 folder. I want to *get the 1st gz file* from the s3 folder and then sort only the 1st gz file into an Ordered Map as below and get the orderedMap.*getFirstKey() as a 1st event timestamp*. I want to then *pass this 1st event timestamp to all TaskManagers along with a single current time* as an epoch Time.
final DataStreamSource<String> stringDataStreamSource = env.readTextFile(s3Folder); final SingleOutputStreamOperator<Map<String, Map<String, Object>>> orderedMapOutput = stringDataStreamSource.map(new MapFunction<String, Map<String, Map<String, Object>>>() { @Override public Map<String, Map<String, Object>> map(String jsonStr) throws Exception { logger.info("record written:{}", jsonStr); //this shows the proper json string from within the gz file properly Map<String, Object> resultMap = fromJson(jsonStr);//deserialize json //sort by event_timestamp Map<String, Map<String, Object>> orderedMap = new TreeMap<>(); if (resultMap != null) { Object eventTsObj = resultMap.get(EVENT_TIMESTAMP); if (eventTsObj != null) { String eventTS = (String) eventTsObj; orderedMap.put(eventTS, resultMap); } } else { logger.warn("Could not deserialize:{}", jsonStr); } return orderedMap; } }); TIA,