#!/usr/bin/python3 # -*- coding: UTF-8 -*- import time, busio, board, sys, fileinput, os, argparse import numpy as np import adafruit_veml6070 import adafruit_tsl2561 sys.path.append('/home/ghz/wxlib') import wxlib as wx wx_dir = "/home/ghz/light_wx" plot_d = wx_dir+'/plots/' dat_fname='light_levels' def conv_qs(data): if data == b'??' or data == '??': return np.nan else: return data # part of the ongoing effort to keeps the two sensors from locking up when it's bright out. def uv_read(uv): if(uv == 0): return np.nan uv.wake() # we get 0 if we don't wait first. time.sleep(0.9) # first reads are kinda bogus after wake. uvr = uv.uv_raw uvr = uv.uv_raw uvr = uv.uv_raw uv.sleep() return uvr if __name__ == "__main__": parser = argparse.ArgumentParser(description='Record, graph, and upload ambient light levels.') parser.add_argument('-d', dest = 'debug', action = 'store_true', help='turn on debug chars') parser.add_argument('-D', dest = 'moredebug', action = 'store_true', help='turn on more debug chars') parser.add_argument('-k', dest = 'killuv', action = 'store_true', help='kill the uv sensor') args = parser.parse_args() i2c = busio.I2C(board.SCL, board.SDA) # the adafruit lib will take the following integration times # https://learn.adafruit.com/adafruit-veml6070-uv-light-sensor-breakout?view=all # VEML6070_HALF_T ~62.5ms # VEML6070_1_T ~125ms # VEML6070_2_T ~250ms # VEML6070_4_T ~500ms uv_int_tarray = [62.5, 125, 250, 500] uv_dings_array = ['VEML6070_HALF_T', 'VEML6070_1_T', 'VEML6070_2_T', 'VEML6070_4_T'] uv_a_idx = 0 uv_int_time = uv_dings_array[uv_a_idx] # sometimes our uv sensor goes away, and only powering it off for a minute or two # and restarting it will bring it back, which is hard to do remotely. tmep hack. if(args.killuv): uv_sens = 0 else: uv_sens = adafruit_veml6070.VEML6070(i2c, uv_int_time) vis_ir = adafruit_tsl2561.TSL2561(i2c) # integration time (0 = 13.7ms, 1 = 101ms, 2 = 402ms) int_time = 0 wx.proof_dir(plot_d) # init the tsl sensor vis_ir.enabled = True time.sleep(1) vis_ir.gain = 0 vis_ir.integration_time = int_time # first reads are often kinda bogus try: if(args.debug): print('$', end='', flush=True) lux = vis_ir.lux bb = vis_ir.broadband ir = vis_ir.infrared except: print("inital read failed.") exit() vis_ir.enabled = False t_vals = {} vals = {} counts = {} for i in range(0,5): t_vals[i] = vals[i] = counts[i] = 0 while True: uvt = uv_read(uv_sens) if(isinstance(uvt, (float, int))): if((uvt == 0 and uv_a_idx == 0) or uvt == 255): # try one more time if(args.debug): print('j', uvt, end='', flush=True) uvt = uv_read(uv_sens) if(args.debug): print('k', uvt, end='', flush=True) if(isinstance(uvt, (float, int)) and uvt != 255): if(uvt < 1.0 and uv_a_idx < 3): if(args.debug): print('^', uvt, end='', flush=True) uv_a_idx += 1 uv_sens.integration_time = uv_dings_array[uv_a_idx] if(uvt > 100 and uv_a_idx > 0): if(args.debug): print('v', uvt, end='', flush=True) uv_a_idx -= 1 uv_sens.integration_time = uv_dings_array[uv_a_idx] t_vals[1] = uv_read(uv_sens) vis_ir.enabled = True # we get bogus values in the dark if we don't wait a bit first time.sleep(0.5) # run a couple reads after wake up to shake the sleep out. t_vals[0] = vis_ir.lux time.sleep(0.1) t_vals[0] = vis_ir.lux time.sleep(0.1) t_vals[0] = vis_ir.lux gain = vis_ir.gain int_time = vis_ir.integration_time bbt = vis_ir.broadband irt = vis_ir.infrared if(args.moredebug): print("(", gain, ",", int_time, ")->", end='', flush=True) if(isinstance(gain, (float, int)) and isinstance(bbt, (float, int)) and isinstance(irt, (float, int))): if(bbt < 4.0 and irt < 2.0): if(gain < 0.5): vis_ir.gain = 1 if(gain > 0.5): if(int_time < 2): int_time += 1 if(args.debug): print('+', end='', flush=True) vis_ir.integration_time = int_time if(bbt > 100 and irt > 100): if(gain > 0.5): vis_ir.gain = 0 if(gain < 0.5): if(int_time > 0): int_time -= 1 if(args.debug): print('-', end='', flush=True) vis_ir.integration_time = int_time if(args.moredebug): print("(", gain, ",", int_time, ")", end='', flush=True) gain = vis_ir.gain # we might have just changed this, recheck it. if(isinstance(gain, (float, int))): # we don't always get types that make sense. if(gain > 0.5): t_vals[2] = vis_ir.broadband / 16.0 t_vals[3] = vis_ir.infrared / 16.0 else: t_vals[2] = vis_ir.broadband t_vals[3] = vis_ir.infrared # do the last lux read with the final gain setting t_vals[0] = vis_ir.lux vis_ir.enabled = False t_vals[4] = float(wx.pi_temp_read()) / 1000 for i in range(0,5): if(isinstance(t_vals[i], (float, int))): vals[i] += t_vals[i] counts[i] += 1 if(args.debug): print('.', end='', flush=True) # wait till we have at least 64 counts for one of the more reliable sensors. if(counts[4] >= 64 or counts[1] >= 64): # XXX div by 0 workaround for wonky lux part. if(vals[0] == 0 and counts[0] == 0): if(args.debug): print('!', end='', flush=True) counts[0] = np.nan try: dbg_step = 0 lux = "%.2f" % (vals[0] / counts[0]) dbg_step = 1 tuvr = (vals[1] / counts[1]) dbg_step = 2 tbb = (vals[2] / counts[2]) dbg_step = 3 tir = (vals[3] / counts[3]) dbg_step = 4 pi_t = "%.2f" % (vals[4] / counts[4]) dbg_step = 5 except: # this can (and does) happen if z.B. we never get a valid count for the lux # toss everything and start main loop over again. print("computing averages failed: ", dbg_step) if(args.debug): print("t_vals: ", t_vals) print("vals: ", vals) print("counts: ", counts, flush=True) for i in range(0,5): t_vals[i] = vals[i] = counts[i] = 0 continue if(args.debug): print('*', end='', flush=True) # scaling here appears linear with integration time, unlike in the lux calc??? if(int_time == 0): tbb *= 402/13.7 tir *= 402/13.7 if(int_time == 1): tbb *= 402/101 tir *= 402/101 # scaling for uv integration time tuvr *= 500 / uv_int_tarray[uv_a_idx] # if the lux model inputs are 0, we can resonably expect 0 on the output if(vals[2] == 0 and vals[3] == 0): if(args.debug): print('_', end='', flush=True) lux = 0.00 # it's gotten better, but we still get some wobbles in the lux calc. if(tbb > 5000 and tir > 5000 and float(lux) < 10): if(args.debug): print('#', end='', flush=True) lux = np.nan # the broadband seems to wrap around 110,000 counts. the IR sensor seems more immune to this. # XXX extra cheesy, prly not very valid attempt at unwrapping sensor data if(tbb < 40000 and tir > 25000): if(args.debug): print('w', end='', flush=True) tbb += 117406.65 # highest recorded value so far # first stab at cheesy unwrapping the uv data. uv_wrap = 1965.5 # 1965.5 is the highest value we've seen yet. if(tuvr < 400 and tir > 40000): if(args.debug): print('x', end='', flush=True) tuvr += uv_wrap if(tuvr < 650 and tir > 55000): if(args.debug): print('y', end='', flush=True) tuvr += uv_wrap if(tuvr < 800 and tir > 65000): if(args.debug): print('z', end='', flush=True) tuvr += uv_wrap # treat these sensors more like one pixle cameras than broken temp sensors # when it reach saturation, indicate saturation, not nan. lux_sat = 32893 # highest value so far. XXX figure out what actual saturation is if(isinstance(lux, (float, int)) and np.isnan(lux) and tbb > 80000 and tir > 40000): lux = lux_sat # more weird saturation conditions if(isinstance(lux, (float, int)) and lux < 1000 and tbb > 80000 and tir > 40000): lux = lux_sat if(args.debug): print('o', end='', flush=True) bb = "%.2f" % (tbb) ir = "%.2f" % (tir) uvr = "%.2f" % (tuvr) ts = time.strftime("%Y%m%d%H%M%S", time.gmtime()) dat = "%s\tlux: %s\tUVA: %s\tBB: %s\tIR: %s\tGain: %s\tPi_temp: %s\tvis_int_time: %s\tuv_int_time: %s\n" % \ (ts, lux, uvr, bb, ir, gain, pi_t, int_time, uv_dings_array[uv_a_idx]) wx.write_out_dat_stamp(ts, dat_fname, dat, wx_dir) # it's solar powered. i'm in n.eu. it doesn't always enjoy clean shutdowns, # and then garbage in the dat file prevents the graphing from working on restart. # try a sync, and see if that helps. how much garbage should we write to a closed file? if(args.debug): print('s', end='', flush=True) os.sync() if(args.debug): print('g', end='', flush=True) try: # run perl script to grab last 48h of data, and gnuplot os.system('/home/ghz/light_wx/light_sync') except: print("light_sync failed.") if(args.debug and not args.moredebug): print('r', flush=True) if(args.debug and args.moredebug): print('r', end='', flush=True) try: # wx7_sync defined in ~/.ssh/config with host, user, key, etc. os.system('/usr/bin/rsync --timeout=60 -ur ' + wx_dir + '/* wx7_sync:/wx7/') except: print("rsync failed.") if(args.moredebug): print(counts, flush=True) for i in range(0,5): t_vals[i] = vals[i] = counts[i] = 0