Saturday, February 7, 2015

Gaining visibiliy to ad-hoc data exports from Splunk

Along the same lines of understanding how your users are using Splunk and dovetailing into are users abusing their access to data in Splunk is taking a periodic look into what data they might be exporting. By that I mean exporting to a csv or maybe generating a pdf of a dashboard. Ideally you would like to know, for example, if this Mark character has exported something, what format was it in, what was the search, and how many records or results were included in the download.
There are a couple challenges


  1. Search results (result count, events searched, etc) are in the internal search completion logs while the search parameters are in the internal search initiation logs.
  2. Those logs are separate from the web logs that indicate someone has performed one of the export actions.
  3. The various Splunk commands you might use to merge all of this data has some limitations that you will need to keep in mind. For example to use a subsearch to get something like search_id and pass it to a parent search is limited by default to a 60s runtime and/or 10k results. A join or append is limited to a 60s runtime and/or 50k records, again by default. If you have even a moderately sized deployment over the course of several days you have thousands of searches being run when you factor in your users, scheduled content, and internal Splunk processes. I suppose one way to mitigate this is to review the detection query output every day but that seems a little too frequent to me.

Let's break the first two items down. To collapse searches relative to item 1 you could perform a search like this

index=_audit "action=search" "info=granted" OR "info=completed" | table apiStartTime apiEndTime user search_id result_count user search | stats values(*) as * by search_id


The linking factor between is the search_id so that is the obvious split-by field. The values(*) bit is just a fancy shortcut that allows you to get the values from the other fields without having to write them all out. It is helpful if you want to add an additional field like event_count or scan_count. Note that one reason we are able to get away with this is each of the fields in question only has one piece of data. If there were multiple pieces you could still do the values(*) but you might need to use a mvexpand into your query.

The next piece is to figure out the web logs for things like ad-hoc pdf export or csv. Through some trial and error I came up with the following indicators in Splunk's web logs. For example this first one is showing me having Splunk export a dashboard to PDF


10.10.10.1 - runals [05/Feb/2015:07:29:32.127 -0500] "POST /splunk/web/en-US/splunkd/__raw/services/pdfgen/render HTTP/1.1" 200 95604 "https://splunk:443/splunk/web/en-US/app/CTCM/ct_change_overview?earliest=0&latest=" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.94 Safari/537.36" - 54d3622c207f538c367c10 24135ms


This second one is me having Splunk export one of the dashboard panels as a csv


10.10.10.1 - runals [05/Feb/2015:07:34:21.046 -0500] "GET /splunk/web/en-US/api/search/jobs/runals__runals__CTCM__search2_1423139369.829/results?isDownload=true&timeFormat=%25FT%25T.%25Q%25%3Az&maxLines=0&count=0&filename=testtesttest&outputMode=csv&spl_ctrl-limit=unlimited&spl_ctrl-count=1000 HTTP/1.1" 200 - "https://splunk:443/splunk/web/en-US/app/CTCM/ct_change_overview?earliest=0&latest=" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.94 Safari/537.36" - 54d3634d0b7f53a05cecd0 88ms


So there are a couple things to note. The first is both have the app context and dashboard the user is in (even if just in "search" which isn't represented here). The second is only the csv export has a search_id. The query I used then is as follows. 


index=_internal sourcetype=splunk_web_access isDownload=true OR uri=*/render | rex "/app/(?<app>[^/]+)/(?<dashboard>[^\?\"]+)" | rex "jobs/(?<search_id>[^\/]+)" | eval outputMode= if(isnotnull(outputMode), outputMode, "pdf of dashboard") | eval filename = if(isnull(filename) AND NOT like(outputMode,"pdf%"), search_id, filename) | table _time host user search_id outputMode filename app dashboard


In order to stitch this together I'm first going get the search_ids for the csv etc exported data in a subsearch to pass to get the search particulars. Then run an append to get the user, app context, and export types. You could use a join here but the PDFs aren't going to have a search_id. This was the first time using random() which was cool for me.


index=_audit "action=search" "info=granted" OR "info=completed" [search index=_internal sourcetype=splunk_web_access isDownload=true | rex field=uri "jobs\/(?<search_id>[^\/]+)" | eval search_id = "'".search_id."'" | fields search_id] | table apiStartTime apiEndTime user search_id result_count user search | stats values(*) as * by search_id | rex field=search_id "'(?<search_id>[^']+)" | append [search index=_internal sourcetype=splunk_web_access isDownload=true OR uri=*/render  | rex "/app/(?<app>[^/]+)/(?<dashboard>[^\?\"]+)" | rex "jobs/(?<search_id>[^\/]+)" | eval outputMode= if(isnotnull(outputMode), outputMode, "pdf") | eval filename = if(isnull(filename) AND NOT like(outputMode,"pdf%"), search_id, filename) | table _time host user search_id outputMode app dashboard filename] | eval search_id = if(isnotnull(search_id), search_id, "-".random()) | rename _time as time | stats values(*) as * by search_id | table time user host app dashboard outputMode filename result_count apiStartTime apiEndTime search | sort time | convert ctime(time) | rename apiStartTime as query_start apiEndTime as query_end | eval filename = urldecode(filename)


Of course in talking with one of our student workers we did come up with another use case to cover. What if someone used a command like outputlookup to write data to a local csv. There are a couple issues with this. The first is someone would have to have access to the server to actually grab the file; the second is this is a common way to generate dynamically updated lookups. I've gone ahead and baked this in but is sort of kludgy and I don't want to go through the process of linking these to the Splunk web logs or including searches that are scheduled. Again the point of this exercise is to look at the ad-hoc stuffs. To assuage my guilt at not getting the app/dashboard context I've put in the search id in the dashboard field allowing someone to look it up if they so desire.


index=_audit "action=search" "info=granted" OR "info=completed" [search index=_internal sourcetype=splunk_web_access isDownload=true | rex field=uri "jobs\/(?<search_id>[^\/]+)" | eval search_id = "'".search_id."'" | fields search_id] | table apiStartTime apiEndTime user search_id result_count user search | stats values(*) as * by search_id | rex field=search_id "'(?<search_id>[^']+)" | append [search index=_internal sourcetype=splunk_web_access isDownload=true OR uri=*/render  | rex "/app/(?<app>[^/]+)/(?<dashboard>[^\?\"]+)" | rex "jobs/(?<search_id>[^\/]+)" | eval outputMode= if(isnotnull(outputMode), outputMode, "pdf") | eval filename = if(isnull(filename) AND NOT like(outputMode,"pdf%"), search_id, filename) | table _time host user search_id outputMode app dashboard filename] | append [search index=_audit "action=search" "info=granted" OR "info=completed" [search index=_audit "action=search" "info=granted" OR "info=completed" outputlookup OR outputcsv OR outputtext NOT "outputlookup OR outputcsv OR outputtext" savedsearch_name="" | fields search_id] | rename _time as time | table time apiStartTime apiEndTime user search_id result_count user search host | stats values(*) as * by search_id | rex field=search "(?<outputMode>output(lookup|csv|text))\s+(?<filename>\S+)" | eval app = "search_id =" | eval dashboard = search_id | stats min(time) as time1 by outputMode host search_id apiEndTime apiStartTime filename result_count search user app dashboard]| eval search_id = if(isnotnull(search_id), search_id, "-".random()) | eval time = coalesce(time1,_time) | fields - time1 _time | stats values(*) as * by search_id | table time user host app dashboard outputMode filename result_count apiStartTime apiEndTime search | sort time | convert ctime(time) | rename apiStartTime as query_start apiEndTime as query_end | eval filename = urldecode(filename) 


 In my decently large environment the query takes all of about 15 seconds to run over a 7 day period.

No comments:

Post a Comment