Aggregate event data using Ingest Processor
Learn how to aggregate event data using Ingest Processor to optimize data flow and reduce log volume by processing partial aggregations in batches.
You can create a pipeline that aggregates your incoming event data to reduce the volume of raw logs being sent to your destination.
For example, when working with network flow record data, you can get the value sum of total bytes transferred from each source IP address. You can then send these aggregations as singular events to your data destination to be indexed.
Data is aggregated using the stats command in an SPL2 pipeline. See the stats command documentation in the SPL2 Search Reference manual for more information.
Differences between ingest-time aggregations and search-time aggregations
Due to the streaming data flow through Ingest Processor pipelines, the stats command performs partial aggregations in batches. These batches can be made up of single or multiple events, depending on the rate of your data flow through pipelines and data source configurations. If aggregable data is processed across multiple batches, each batch is processed independently. These batches of partial aggregations are then routed as events to your pipeline destination. If you want to retrieve a complete and finalized aggregation of your data, you must run a search using the stats command in Splunk platform once your data is indexed. See Processing aggregations at search-time for more information.
| Optional argument | Can be used in an Edge Processor pipeline | Can be used in Splunk platform search |
|---|---|---|
| all-num | Yes | |
| batch_id | Yes | |
| batch_time | Yes | |
| by-clause | Yes | Yes |
| delim | Yes | |
| instance_id | Yes | |
| partitions | Yes | |
| span | Yes | Yes |
Create a pipeline to aggregate your data
Managing aggregations
Aggregation patterns
Different types of aggregations can be created with the stats command in an Ingest Processor pipeline depending on how many fields you want your indexed events to retain.
Summary pattern: Choosing a limited number of fields to aggregate in your pipeline filters out unselected fields and outputs a distilled summarization of your data. See Create a pipeline to aggregate your data for an example of the summary pattern.
Passthrough pattern: If you want to retain all fields from your events and still reduce the overall number of events being indexed, you can add all desired fields in the BY <clause> section of your pipeline statement. See below for an example of the passthrough pattern in practice.
| server_name | server_ip | file_requested | bytes_out | sourcetype |
|---|---|---|---|---|
| web-01 | 10.1.2.3 | /index.html | 500 | web |
| web-01 | 10.1.2.3 | /index.html | 500 | web |
| web-02 | 10.1.2.4 | /css/main.css | 1200 | web |
... | stats sum(bytes_out) AS bytes_out BY server_name, server_ip, file_requested, sourcetype
| rename orig_sourcetype AS sourcetype
| eval _raw=json_delete(_raw, "orig_sourcetype"
| _raw | server_name | server_ip | file_requested | bytes_out | sourcetype |
|---|---|---|---|---|---|
| {"server_name": "web-01", "server_ip": "10.1.2.3", "file_requested": "/index.html", "bytes_out": 1000} | web-01 | 10.1.2.3 | /index.html | 1000 | web |
| {"server_name": "web-02", "server_ip": "10.1.2.4", "file_requested": "/css/main.css", "bytes_out": 1200} | web-02 | 10.1.2.4 | /css/main.css | 1200 | web |
Processing aggregations at search-time
Now that you have an applied pipeline aggregating your data in batches and sending these batches to your Splunk platform destination, you can finalize your batched aggregations by running a search in Splunk platform using the stats command. There are corresponding SPL1 queries for each statistical function run in an Ingest Processor pipeline to finalize your aggregations. See the following examples for these search statements.
| server_name | server_ip | file_requested | bytes_out | sourcetype |
|---|---|---|---|---|
| web-01 | 10.1.2.3 | /index.html | 500 | web |
| web-01 | 10.1.2.3 | /images/logo.png | 8500 | web |
| web-02 | 10.1.2.4 | /css/main.css | 1200 | web |
... | stats sum(bytes_out) as bytes_out BY server_name
| eval sourcetype="web:summary"
| server_name | bytes_out | _raw | sourcetype |
|---|---|---|---|
| web-01 | 9000 | {"server_name": "web_01", "bytes_out":9000} | web:summary |
| web-02 | 1200 | {"server_name": "web-02", "bytes_out":1200} | web:summary |
index=<myindex> sourcetype="web:summary" | stats sum(bytes_out) BY server_name
... | stats count() as event_count BY server_name
| eval sourcetype="web:summary"
| server_name | event_count | _raw | sourcetype |
|---|---|---|---|
| web-01 | 2 | {"server_name": "web-01", "count": 2} | web:summary |
| web-02 | 1 | {"server_name": "web-02", "count":1} | web:summary |
index=<myindex> sourcetype="web:summary" | stats sum(event_count) BY server_name
... | stats min(bytes_out) as min_bytes BY server_name
| eval sourcetype="web:summary"
index=<myindex> sourcetype="web:summary" | stats min(min_bytes_out) BY server_name
... | stats max(bytes_out) as max_bytes BY server_name
| eval sourcetype="web:summary"
| server_name | max_bytes_out | _raw | sourcetype |
|---|---|---|---|
| web-01 | 8500 | {"server_name": "web-01", "max_bytes_out":8500} | web:summary |
| web-02 | 1200 | {"server_name": "web-02", "max_bytes_out":1200} | web:summary |
index=<myindex> sourcetype="web:summary" | stats max(max_bytes_out) BY server_name
... | stats sum(bytes_out) as bytes_out, count(bytes_out) as event_count BY server_name
| eval sourcetype="web:summary"
| server_name | bytes_out | event_count | _raw | sourcetype |
|---|---|---|---|---|
| web-01 | 9000 | 2 | {"server_name": "web-01", "bytes_out":9000} | web:summary |
| web-02 | 1200 | 1 | {"server_name":"web-02", "bytes_out":1200} | web:summary |
index=<myindex> sourcetype="web:summary"
| stats sum(bytes_out) AS bytes_out, sum(event_count) AS event_count BY server_name
| eval bytes_avg=bytes_out/event_count
| _time | server_name | server_ip | file_requested | bytes_out | sourcetype |
|---|---|---|---|---|---|
| 2025-01-01 12:00:05 | web-01 | 10.1.2.3 | /index.html | 500 | web |
| 2025-01-01 12:01:22 | web-01 | 10.1.2.3 | /images/logo.png | 8500 | web |
| 2025-01-01 12:00:28 | web-02 | 10.1.2.4 | /css/main.css | 1200 | web |
… | stats sum(bytes_out) as bytes_out BY span(_time, 1m), server_name
| eval sourcetype="web:summary"
| _time | server_name | bytes_out | _raw | sourcetype |
|---|---|---|---|---|
| 2025-01-01 12:00:00 | web-01 | 500 | {"server_name": "web-01", "bytes_out": 500, "_time":1735732800} | web:summary |
| 2025-01-01 12:01:00 | web-01 | 8500 | {"server_name": "web-01", "bytes_out": 8500, "_time":1735732860} | web:summary |
| 2025-01-01 12:00:00 | web-02 | 1200 | {"server_name": "web-01", "bytes_out": 1200} | web:summary |
index=<myindex> sourcetype="web:summary" | stats sum(bytes_out) BY _time, server_name