VERSION: 5.2.2. Both cluster and spark driver are in this version.
I found an interesting issue with the Elasticsearch hadoop driver that when I read data from elasticsearch I got duplicated rows, while the total number of result remains the same. Which means there are rows replaced by the dups.
For example, I’m running a simple spark job that export a query matching 50m rows from elasticsearch cluster. However, in the result, which is still 50m number of rows, there are maybe 20m of them are duplicates. So eventually I got 30m unique rows, and there are 20m rows replaced by those duplicates.
I didn’t find any open issue in the hadoop driver repo in elastico’s github page. But I did manage to find a workaround on this issue.
es.input.max.docs.per.partition: <number of docs in smallest shard>
This is a new parameter added in ES 5. It tries to slice reading from elasticsearch. This parameter is basically the batch size. Default value is 100K.
So, when you have a shard that has more than 100K docs, the spark driver slice that shard into multiple reads.
I’m guessing there’s a bug in calculating the slice number, somehow it reads duplicate documents.
So the workaround is basically set this number big enough to avoid slicing read.
That’s why we need to set this parameter to be the number of docs you have in your smallest shard.
I love elasticsearch and the hadoop driver. I just didn’t have the time to reproduce this issue in a smaller scale and report it. So this bug is unconfirmed from the official channel. However what I can say is the workaround worked for me. No duplicates anymore.