Shubho.dev logo

Elastic Search Update by Query with Ingest Pipeline

I should start with a confession. I love Elasticsearch. It feels fantastic to feed it any document and then search even the precise terms out of it. However, when I started creating and managing a cluster at work, it was a nightmare. I was a beginner, so lots of concepts took time to sink in. Most of the tutorials I found online were of version 1.x-2.x but ES was already at 5.x and just about started the 6.x line. The initial cluster was created on 2.x version and to upgrade it to latest 6.x line required an upgrade first to 5.x line and then finally to the 6.x line.

Since it was still just 1 month into the data feeding the data was not that huge, and the upgrades took 2-3 days (due to the reindexing involved). Our cluster is both index and query heavy. However, indexing is on a best-effort basis. The entire pipeline allows the system to index at various times four to five times. So as I was reindexing and upgrading my cluster, new data was still coming in the old cluster. Once the reindexing completed, I did another reindexing with only the new data. Also, any further data during this reindex was just thrown off. Because each indexing pipeline has around 5 tries at various intervals, eventually the dropped data came back into the new system.

The above process is easy when the overall data load in the cluster is less. Currently, our cluster now has around 15TB worth of data spanning a year, and it supports 3 different systems. Reindexing these days is out of the question. Most of the indices are hot and continuously being used.

Recently I had a requirement of updating a field of a couple of indices with a different value. Now the good thing was it didn’t need a change in the mapping. The field type did not change. It was just the data that needed to be updated.

As an example, assume the field name to be “jobIdentifier”. Most of the documents had this value as alphanumeric. However, some of the documents had entries like 213452-2-12932-<Alphanumberic>. The requirement was to normalize these values to just <Alphanumberic>. Simple enough, right?

Updating Elasticsearch field value using regular expressions

The straightforward way to do this is regular expressions using Painless scripting language. From Elasticsearch docs,

Painless is a simple, secure scripting language designed specifically for use with Elasticsearch.

Using Painless scripting language for the above requirement, we can write the regular expression like this.

json
{
    "script": {
        "lang": "painless",
        "source": "ctx._source.jobIdentifier = /[0-9]+-[0-9]+-[0-9]+-(.*)/.matcher(ctx._source.jobIdentifier).replaceAll('$1')"
    }
}

So in the regular expression, it matches field “jobIdentifier” with the value of type [0-9]+-[0-9]+-[0-9]+-(.*) and replaces it with the grouped pattern .*. Thereby removing the preceding 3 numbers along with the hyphen separator. So a value like 213452-2-12932-Some_random_value becomes Some_random_value.

However, due to StackOverflow issues in Java, regular expressions in Painless scripting are disabled by default in the latest versions of Elasticsearch. https://github.com/elastic/elasticsearch/pull/20427 The issue and its related links describe the fix in great detail. It convinced me to look into other solutions to the above issue. I didn’t want to enable regular expressions in scripting unless necessary.

Ingest Pipeline and Update by Query

Ingest nodes in Elasticsearch are used to pre-process documents before they are indexed. By default, all nodes in a cluster are ingest nodes. They can be separated if the ingest process is resource-intensive. Pipelines define the pre-processor. They contain a "description" and a "processor". The processor is a series of steps that are carried out on the document. They can contain Logstash grok patterns and scripting like Painless.

First let us see the pipeline definition.

bash
curl -X PUT "localhost:9200/_ingest/pipeline/my-pipeline-id" -H 'Content-Type: application/json' -d'
{
    description": "Used to update in place - Remove the alphanumeric ids from jobIdentifier in my_index index",
    "processors": [
        {
            "grok": {
                "field": "jobIdentifier",
                "patterns": [
                    "%{NEWFIELDIDS:eID}-%{NEWFIELDIDS:eSEQID}-%{NEWFIELDIDS:e2ID}-%{GREEDYDATA:REST}"
                ],
                "pattern_definitions": {
                    "NEWFIELDIDS": "[0-9]+"
                },
                "ignore_missing": true
            }
        },
        {
            "script": {
                "lang": "painless",
                "inline": "jobIdentifier = ctx.REST"
            }
        },
        {
            "script": {
                "lang": "painless",
                "inline": "ctx.remove('eID'); ctx.remove('eSEQID'); ctx.remove('e2ID');  ctx.remove('REST')"
            }
        }
    ]
}

The documents go through three processors in the pipeline.

bash
"grok": {
        "field": "jobIdentifier",
        "patterns": [
            "%{NEWFIELDIDS:eID}-%{NEWFIELDIDS:eSEQID}-%{NEWFIELDIDS:e2ID}-%{GREEDYDATA:REST}"
        ],
        "pattern_definitions": {
            "NEWFIELDIDS": "[0-9]+"
        },
        "ignore_missing": true
    }

Match the field “jobIdentifier”. Create a pattern named “NEWFIELDIDS” which has 1 or more numbers. If the field has a sequence of “Numbers-Numbers-Numbers-Other data”, then each number set is assigned a field name (eID, eSEQID, e2ID). Match the rest of the pattern until the end and assign this to field “REST”. This match is done using the inbuilt type “GREEDYDATA” from the Logstash grok patterns. At the end of this step, add the four new fields (eID, eSEQID, e2ID, REST) to the document. However, as mentioned before, there can be documents where the above field pattern might not match. In such cases, the pipeline stops. We do not want that. Hence we add “ignore_missing” as "true".

bash
"script": {
        "lang": "painless",
        "inline": "jobIdentifier = ctx.REST"
    }

Now we run a Painless script. We take the newly created field “REST” and reassign the “jobIdentifier” field as this value.

bash
"script": {
        "lang": "painless",
        "inline": "ctx.remove('eID'); ctx.remove('eSEQID'); ctx.remove('e2ID');  ctx.remove('REST')"
    }

Finally, we remove the newly created fields in step 1 (eID, eSEQID, e2ID, REST) as we do not need them anymore.

The explanation looks enormous, but we basically matched the pattern, updated the field, and then removed the extra new fields created.

Finally using the newly created pipeline (identified by “my-pipeline-id”), we run an Update by Query on the index. We match the documents whose fields match the requisite pattern and pass them through the pipeline. We can use multiple indices in the field to run documents from multiple indices. We use wait_for_completion as false, so the query returns immediately and the update happens in the background.

bash
curl -X POST "localhost:9200/my_index/_update_by_query?pipeline=my-pipeline-id&wait_for_completion=false"
{
    "query": {
        "bool": {
            "must": [
                {
                    "regexp": {
                        "jobIdentifier": "[0-9]+-[0-9]+-[0-9]+-(.*)"
                    }
                }
            ]
        }
    }
}

Conclusion

We might not need the expensive regular expressions in scripting while matching/updating fields in Elasticsearch. Instead of enabling it blindly in our cluster, we can first try a pipeline and "update by query", which can certainly meet our needs in most common cases.