Get Dynatrace json dumps from threaded Python 3.
Today was a boring day so I was doing anything to make it go faster. I ended up redoing some Python code to get Dynatrace data. It didn’t need to be threaded but I was bored! It’s cold, windy, and dark here. Blah! Maybe someone will find this useful?
#!/usr/bin/python3 from concurrent.futures import ThreadPoolExecutor import requests # import datetime import os import json import logging import traceback def init_logging(log_path): log_format = '%(asctime)s - %(levelname)s - %(message)s' logging.basicConfig(filename=log_path, level=logging.INFO, format=log_format) def dynatrace_request(args): '''Request/Reponse''' uri,output,token,cert = args result = None headers = {'accept':'application/json','Authorization':'Api-Token %s' % token} try: response = requests.get(uri, headers=headers, verify=cert)'Request status: %s' % response.status_code) result = json.dumps(response.json()) with open(output,'w') as output: output.write(result) except: err_msg = traceback.format_exc() logging.error(err_msg) return result if __name__ in '__main__': DAYS = 30 # Days to current date. HOST = 'https://SOME_SERVER/e/SOME-ID' TOKEN = 'YOUR TOKEN' CERT = False # Path to cert. Will work with false but undsafe. LOG = r'c:/temp/dyna.log' # Increment timestamp. const_day = 86400 days_ts = DAYS * const_day adj_dt = int( today_dt = int( start_dt = adj_dt - days_ts # Convert to string. start_dt = '%s000' % start_dt today_dt = '%s000' % today_dt # Make sure we have a directory to put our data. base_path = os.path.split(LOG)[0] if not os.exists(base_path): os.mkdir(base_path) # Dictionary of tasks with output paths since we are dumping to json to be picked up by an Ajax routine on some dashboard. tasks = { "c:/temp/counts.json":r"{0}/api/v1/problem/status".format(HOST), "c:/temp/apdex.json":r"{0}/api/v1/userSessionQueryLanguage/table?query=select%20apdexCategory%2C%20application%2C%20count(*)%20FROM%20useraction%20where%20usersession.userType%20%3D%20'REAL_USER'%20GROUP%20BY%20apdexCategory%2Capplication&startTimestamp={1}&endTimestamp={2}&explain=false".format(HOST,start_dt,today_dt) } # init logging. init_logging(LOG)'Started') # Get json. work_count = len(tasks) with ThreadPoolExecutor(max_workers=work_count) as executor: for outp,url in tasks.items(): executor.submit(dynatrace_request, (url, outp, TOKEN, CERT))'Finished')