Coverage for main.py: 24%

121 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-17 21:11 +0000

1"""" 

2Main.py 

3Autor: Andreas Zimmermann 

4Datum: 2025-03-15 

5""" 

6 

7import time 

8import threading 

9import datetime 

10import logging 

11import os 

12import json 

13import random 

14import queue 

15import requests 

16 

17#logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s",filename="logs.log", encoding= 'utf-8',level=logging.INFO,) 

18 

19# Konfigurationsdatei laden 

20def load_config(filename :str) -> any: 

21 #Lädt die Konfiguration aus der JSON-Datei. 

22 #Abbruch wenn kein file vorhanden 

23 assert filename is not None 

24 #Pfad von config Datei auslesen 

25 env_path = os.environ.get("WORKING_ENV", filename) 

26 logging.info("env_path %d", env_path) 

27 logging.info("filename %d", filename) 

28 try: 

29 with open(env_path, encoding='utf-8') as file: 

30 return json.load(file) 

31 except (FileNotFoundError, json.JSONDecodeError) as error: 

32 logging.error("Fehler beim Laden der Konfigurationsdatei: %s", error) 

33 return {} 

34 

35def get_enviroment_filename(): 

36 try: 

37 filename = os.environ.get("WORKING_ENV") 

38 except: 

39 filename = None 

40 

41 if filename is None : 

42 filename = 'config.json' 

43 return filename 

44 

45def put_led_on_off(turn_on: bool, led_id: int, session, apr_url_led :str) -> bool: 

46 #Schaltet die LED ein oder aus 

47 cmd_led : str = 'on' 

48 if turn_on : 

49 cmd_led = 'on' 

50 else : 

51 cmd_led = 'off' 

52 

53 data = {'id': led_id, 'status': cmd_led} 

54 try: 

55 response = session.post(apr_url_led, json=data, timeout=5) 

56 response.raise_for_status() 

57 return response.status_code == 200 

58 except requests.RequestException as error: 

59 logging.error("Fehler beim Senden des LED-Status: %s", error) 

60 return False 

61 

62def get_led_state(led_id: int, session: any, apr_url_led : str) -> None: 

63 #Fragt den Status der LED ab. 

64 try: 

65 response = session.get(apr_url_led, params={'id': led_id}, timeout=5) 

66 response.raise_for_status() 

67 logging.info("LED Status: %s", response.json()) 

68 except requests.RequestException as error: 

69 logging.error("Fehler beim Abrufen des LED-Status: %s", error) 

70 

71def get_solar_power(power_queue: "queue.Queue[int]",requestinterval:float,session, api_url:str) -> None: 

72 """Abrufen und Anzeigen der Solardaten in einer Endlossrequestintervalchleife.""" 

73 while True: 

74 try: 

75 logging.info("Sende API-Anfrage...") 

76 response = session.get(api_url, timeout=5) 

77 response.raise_for_status() 

78 data = response.json() 

79 timestamp = data.get('timestamp', 0) / 1000 # In Sekunden umwandeln 

80 datum = datetime.datetime.fromtimestamp(timestamp) if timestamp else "Ungültiges Datum" 

81 current_power = data.get('currentPower', 0) 

82 power_queue.put(current_power) 

83 logging.info("ID: %s, Zeit: %s, Leistung: %d W", data.get('id', 'N/A'), datum, current_power) 

84 except (requests.RequestException, ValueError, KeyError) as error: 

85 logging.error("Fehler bei API-Anfrage oder Datenverarbeitung: %s", error) 

86 logging.debug("url", api_url) 

87 time.sleep(requestinterval) 

88 

89def main() -> None: 

90 

91 #Konfiguration laden 

92 config_var = load_config('config.json') 

93 mock_consumer: bool = config_var.get('mock_consumer', True) 

94 mock_powerplant_sim = config_var.get('mock_powerplant_sim', True) 

95 

96 power_threshold_on: int = config_var.get('powerTresholdOn', 500) 

97 power_threshold_off: int = config_var.get('powerTresholdOff', 300) 

98 

99 powerplant_id : int = config_var.get('powerplant_id',17) 

100 api_url: str = config_var.get('powerplant_url', 'http://localhost:8080/power') 

101 api_url += str(powerplant_id) 

102 consumer_url: str = config_var.get('consumer_url', 'http://127.0.0.1:5000/led') 

103 

104 logging_interval: int = config_var.get('loggingInterval', 10) 

105 led_id_rpi: int = config_var.get('consumer_led_id', 17) # GPIO on RPI 

106 led_update_interval: int = config_var.get('LedUpdateInterval', 5) 

107 

108 # Logger konfigurieren 

109 log_level_str = config_var.get("logLevel", "INFO").upper() 

110 log_level = getattr(logging, log_level_str, logging.INFO) 

111 print(f"Log-Level: {log_level_str}") 

112 

113 # Logger-Instanz holen 

114 logger = logging.getLogger() 

115 logger.setLevel(log_level) # Setze das Log-Level 

116 

117 # Formatter definieren 

118 log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") 

119 

120 # File-Handler für die Log-Datei 

121 file_handler = logging.FileHandler("logs.log", encoding="utf-8") 

122 file_handler.setFormatter(log_formatter) 

123 file_handler.setLevel(log_level) 

124 

125 # Stream-Handler für die Konsolenausgabe 

126 console_handler = logging.StreamHandler() 

127 console_handler.setFormatter(log_formatter) 

128 console_handler.setLevel(log_level) 

129 

130 # Falls bereits Handler existieren, erst entfernen 

131 if logger.hasHandlers(): 

132 logger.handlers.clear() 

133 

134 # Neue Handler hinzufügen 

135 logger.addHandler(file_handler) 

136 logger.addHandler(console_handler) 

137 

138 # Session für effizientere Requests 

139 session = requests.Session() 

140 

141 

142 #Hauptfunktion des Programms. 

143 power_queue: "queue.Queue[int]" = queue.Queue() 

144 led_cmd: bool = False 

145 

146 #Power auslesen von Powerplant 

147 if mock_powerplant_sim: 

148 logging.info("Starte Datenlogger POWERPLANT SIM...") 

149 else : 

150 logging.info("Starte den Datenlogger-Thread...") 

151 datalogger_thread = threading.Thread(target=get_solar_power, args=(power_queue, logging_interval, session, api_url), daemon=True) 

152 datalogger_thread.start() 

153 

154 logging.info("Datalogger gestartet") 

155 try: 

156 while True: 

157 #current Power source SIM oder Powerplant 

158 if mock_powerplant_sim : 

159 current_power =random.randint(1, 800) 

160 else: 

161 current_power = power_queue.get() 

162 logging.info("Aktuelle Leistung: %d W", current_power) 

163 

164 #Controller for on/off cunsumer 

165 if current_power > power_threshold_on : 

166 led_cmd = True 

167 elif current_power < power_threshold_off : 

168 led_cmd = False 

169 

170 #consumer response source SIM oder API Response 

171 if mock_consumer : 

172 logging.info("LED Zustand : %d", led_cmd) 

173 else : 

174 put_led_on_off(led_cmd, led_id_rpi,session,consumer_url) 

175 get_led_state(led_id_rpi,session,consumer_url) 

176 

177 time.sleep(led_update_interval) 

178 

179 except KeyboardInterrupt: 

180 logging.info("Programm wird beendet...") 

181 

182if __name__ == '__main__': 

183 main()