Re: Is spark not good for ingesting into updatable databases?

2018-10-30 Thread ravidspark
Hi Jorn,

Just want to check if you got a chance to look at this problem. I couldn't
figure out any reason on why this is happening. Any help would be
appreciated.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is spark not good for ingesting into updatable databases?

2018-10-27 Thread ravidspark
Hi Jorn,

Thanks for your kind reply. I do accept that there might be something in the
code. Any help would be appreciated. 

To give you some insights, I checked the source of the message in kafka if
it has been repeated twice. But, I could only find it once. Also, it would
have been convincing if all the messages are duplicated instead of only few.
Please find below my source code & also a snapshot of the message that is
getting duplicated in the entire logs:




JavaDStream> prePepForMappedJsonStream =
stream.map(new Function>() {

Map mappedJson = null;
@Override
public Map call(String 
inputJsonMessage) {
try {
if(StringUtils.length(inputJsonMessage) 
!= 2) {
mappedJson = new HashMap<>();
StopWatch watch = 
StopWatchSingleton.instance();
watch.reset();watch.start();
logger.info("Transformation-1 
Start:{} & Input Message is: {}",
LocalDateTime.now(),inputJsonMessage);
JsonToMapPrePepTransformer 
instance = new
JsonToMapPrePepTransformer(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"));
mappedJson = 
instance.transformJsonToMap(inputJsonMessage);
watch.stop();
logger.info("Transformation-1 
End:{}, Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(), mappedJson);
}
} catch (Exception e) {
logger.error("",e);
}
}
return mappedJson;
}   
});

JavaDStream> transformedStream =
prePepForMappedJsonStream.map(new Function,
Map>() {

Map resultMap = null;
@Override
public Map call(Map 
readyToTransformMap)
throws Exception {
if(readyToTransformMap != null) {
StopWatch watch = 
StopWatchSingleton.instance();
watch.reset();watch.start();
logger.info("Transformation-2 Start:{} 
& Input Message is: {}",
LocalDateTime.now(),readyToTransformMap);
resultMap = new HashMap<>();
resultMap =
YBEDFormatter.init(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"),
readyToTransformMap);
watch.stop();
logger.info("Transformation-2 End:{}, 
Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(), resultMap);
}
return resultMap;
}
});

JavaDStream kafkaPreIngestStream =
transformedStream.map(new Function, ResultMapHolder>()
{

ResultMapHolder resultMapBean = null;
@Override
public ResultMapHolder call(Map 
finalTransformedMap)
throws Exception {
try {
if(finalTransformedMap != null) {
StopWatch watch = 
StopWatchSingleton.instance();
watch.reset();watch.start();
logger.info("Transformation-3 
Start:{} & Input Message is: {}",
LocalDateTime.now(),finalTransformedMap);
resultMapBean =
MapToArrayTransformerForYBIngestion.instance().transformMapToOrderedArrayOfValues(finalTransformedMap,
tableColumns);
watch.stop();
logger.info("Transformation-3 
End:{}, Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(),
Arrays.toString(resultMapBean.getOutputRow()));
}
} catch (Exception e) {
logger.error("",e);
}
return resultMapBean;
}
});



Please observe the

Re: Is spark not good for ingesting into updatable databases?

2018-10-27 Thread Jörn Franke
Do you have some code that you can share?

Maybe it is something in your code that unintentionally duplicates it?

Maybe your source (eg the application putting it on Kafka?)duplicates them 
already?
Once and only once processing needs to be done end to end.

> Am 27.10.2018 um 02:10 schrieb ravidspark :
> 
> Hi All,
> 
> My problem is as explained,
> 
> Environment: Spark 2.2.0 installed on CDH
> Use-Case: Reading from Kafka, cleansing the data and ingesting into a non
> updatable database.
> 
> Problem: My streaming batch duration is 1 minute and I am receiving 3000
> messages/min. I am observing a weird case where, in the map transformations
> some of the messages are being reprocessed more than once to the downstream
> transformations. Because of this I have been seeing duplicates in the
> downstream insert only database.
> 
> It would have made sense if the reprocessing of the message happens for the
> entire task in which case I would have assumed the problem is because of the
> task failure. But, in my case I don't see any task failures and only one or
> two particular messages in the task will be reprocessed. 
> 
> Everytime I relaunch the spark job to process kafka messages from the
> starting offset, it would dup the exact same messages all the time
> irrespective of number of relaunches.
> 
> I added the messages that are getting duped back to kafka at a different
> offset to see if I would observe the same problem, but this time it won't
> dup.
> 
> Workaround for now: 
> As a workaround for now, I added a cache at the end before ingestion into DB
> which gets updated processed event and thus making sure it won't be
> reprocessed again.
> 
> 
> My question here is, why am I seeing this weird behavior(only one particular
> message in the entire batch getting reprocessed again)? Is there some
> configuration that would help me fix this problem or is this a bug? 
> 
> Any solution apart from maintaining a cache would be of great help.
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is spark not good for ingesting into updatable databases?

2018-10-26 Thread ravidspark
Hi All,

My problem is as explained,

Environment: Spark 2.2.0 installed on CDH
Use-Case: Reading from Kafka, cleansing the data and ingesting into a non
updatable database.

Problem: My streaming batch duration is 1 minute and I am receiving 3000
messages/min. I am observing a weird case where, in the map transformations
some of the messages are being reprocessed more than once to the downstream
transformations. Because of this I have been seeing duplicates in the
downstream insert only database.

It would have made sense if the reprocessing of the message happens for the
entire task in which case I would have assumed the problem is because of the
task failure. But, in my case I don't see any task failures and only one or
two particular messages in the task will be reprocessed. 

Everytime I relaunch the spark job to process kafka messages from the
starting offset, it would dup the exact same messages all the time
irrespective of number of relaunches.

I added the messages that are getting duped back to kafka at a different
offset to see if I would observe the same problem, but this time it won't
dup.

Workaround for now: 
As a workaround for now, I added a cache at the end before ingestion into DB
which gets updated processed event and thus making sure it won't be
reprocessed again.


My question here is, why am I seeing this weird behavior(only one particular
message in the entire batch getting reprocessed again)? Is there some
configuration that would help me fix this problem or is this a bug? 

Any solution apart from maintaining a cache would be of great help.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org