Re: Is spark not good for ingesting into updatable databases?
Hi Jorn, Just want to check if you got a chance to look at this problem. I couldn't figure out any reason on why this is happening. Any help would be appreciated. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Is spark not good for ingesting into updatable databases?
Hi Jorn, Thanks for your kind reply. I do accept that there might be something in the code. Any help would be appreciated. To give you some insights, I checked the source of the message in kafka if it has been repeated twice. But, I could only find it once. Also, it would have been convincing if all the messages are duplicated instead of only few. Please find below my source code & also a snapshot of the message that is getting duplicated in the entire logs: JavaDStream> prePepForMappedJsonStream = stream.map(new Function>() { Map mappedJson = null; @Override public Map call(String inputJsonMessage) { try { if(StringUtils.length(inputJsonMessage) != 2) { mappedJson = new HashMap<>(); StopWatch watch = StopWatchSingleton.instance(); watch.reset();watch.start(); logger.info("Transformation-1 Start:{} & Input Message is: {}", LocalDateTime.now(),inputJsonMessage); JsonToMapPrePepTransformer instance = new JsonToMapPrePepTransformer(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick")); mappedJson = instance.transformJsonToMap(inputJsonMessage); watch.stop(); logger.info("Transformation-1 End:{}, Elapsed:{} & OutputMessage is: {}", LocalDateTime.now(), watch.getTime(), mappedJson); } } catch (Exception e) { logger.error("",e); } } return mappedJson; } }); JavaDStream> transformedStream = prePepForMappedJsonStream.map(new Function, Map>() { Map resultMap = null; @Override public Map call(Map readyToTransformMap) throws Exception { if(readyToTransformMap != null) { StopWatch watch = StopWatchSingleton.instance(); watch.reset();watch.start(); logger.info("Transformation-2 Start:{} & Input Message is: {}", LocalDateTime.now(),readyToTransformMap); resultMap = new HashMap<>(); resultMap = YBEDFormatter.init(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"), readyToTransformMap); watch.stop(); logger.info("Transformation-2 End:{}, Elapsed:{} & OutputMessage is: {}", LocalDateTime.now(), watch.getTime(), resultMap); } return resultMap; } }); JavaDStream kafkaPreIngestStream = transformedStream.map(new Function, ResultMapHolder>() { ResultMapHolder resultMapBean = null; @Override public ResultMapHolder call(Map finalTransformedMap) throws Exception { try { if(finalTransformedMap != null) { StopWatch watch = StopWatchSingleton.instance(); watch.reset();watch.start(); logger.info("Transformation-3 Start:{} & Input Message is: {}", LocalDateTime.now(),finalTransformedMap); resultMapBean = MapToArrayTransformerForYBIngestion.instance().transformMapToOrderedArrayOfValues(finalTransformedMap, tableColumns); watch.stop(); logger.info("Transformation-3 End:{}, Elapsed:{} & OutputMessage is: {}", LocalDateTime.now(), watch.getTime(), Arrays.toString(resultMapBean.getOutputRow())); } } catch (Exception e) { logger.error("",e); } return resultMapBean; } }); Please observe the
Re: Is spark not good for ingesting into updatable databases?
Do you have some code that you can share? Maybe it is something in your code that unintentionally duplicates it? Maybe your source (eg the application putting it on Kafka?)duplicates them already? Once and only once processing needs to be done end to end. > Am 27.10.2018 um 02:10 schrieb ravidspark : > > Hi All, > > My problem is as explained, > > Environment: Spark 2.2.0 installed on CDH > Use-Case: Reading from Kafka, cleansing the data and ingesting into a non > updatable database. > > Problem: My streaming batch duration is 1 minute and I am receiving 3000 > messages/min. I am observing a weird case where, in the map transformations > some of the messages are being reprocessed more than once to the downstream > transformations. Because of this I have been seeing duplicates in the > downstream insert only database. > > It would have made sense if the reprocessing of the message happens for the > entire task in which case I would have assumed the problem is because of the > task failure. But, in my case I don't see any task failures and only one or > two particular messages in the task will be reprocessed. > > Everytime I relaunch the spark job to process kafka messages from the > starting offset, it would dup the exact same messages all the time > irrespective of number of relaunches. > > I added the messages that are getting duped back to kafka at a different > offset to see if I would observe the same problem, but this time it won't > dup. > > Workaround for now: > As a workaround for now, I added a cache at the end before ingestion into DB > which gets updated processed event and thus making sure it won't be > reprocessed again. > > > My question here is, why am I seeing this weird behavior(only one particular > message in the entire batch getting reprocessed again)? Is there some > configuration that would help me fix this problem or is this a bug? > > Any solution apart from maintaining a cache would be of great help. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Is spark not good for ingesting into updatable databases?
Hi All, My problem is as explained, Environment: Spark 2.2.0 installed on CDH Use-Case: Reading from Kafka, cleansing the data and ingesting into a non updatable database. Problem: My streaming batch duration is 1 minute and I am receiving 3000 messages/min. I am observing a weird case where, in the map transformations some of the messages are being reprocessed more than once to the downstream transformations. Because of this I have been seeing duplicates in the downstream insert only database. It would have made sense if the reprocessing of the message happens for the entire task in which case I would have assumed the problem is because of the task failure. But, in my case I don't see any task failures and only one or two particular messages in the task will be reprocessed. Everytime I relaunch the spark job to process kafka messages from the starting offset, it would dup the exact same messages all the time irrespective of number of relaunches. I added the messages that are getting duped back to kafka at a different offset to see if I would observe the same problem, but this time it won't dup. Workaround for now: As a workaround for now, I added a cache at the end before ingestion into DB which gets updated processed event and thus making sure it won't be reprocessed again. My question here is, why am I seeing this weird behavior(only one particular message in the entire batch getting reprocessed again)? Is there some configuration that would help me fix this problem or is this a bug? Any solution apart from maintaining a cache would be of great help. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org