Coverage for main.py: 24%
121 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-17 21:11 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-17 21:11 +0000
1""""
2Main.py
3Autor: Andreas Zimmermann
4Datum: 2025-03-15
5"""
7import time
8import threading
9import datetime
10import logging
11import os
12import json
13import random
14import queue
15import requests
17#logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s",filename="logs.log", encoding= 'utf-8',level=logging.INFO,)
19# Konfigurationsdatei laden
20def load_config(filename :str) -> any:
21 #Lädt die Konfiguration aus der JSON-Datei.
22 #Abbruch wenn kein file vorhanden
23 assert filename is not None
24 #Pfad von config Datei auslesen
25 env_path = os.environ.get("WORKING_ENV", filename)
26 logging.info("env_path %d", env_path)
27 logging.info("filename %d", filename)
28 try:
29 with open(env_path, encoding='utf-8') as file:
30 return json.load(file)
31 except (FileNotFoundError, json.JSONDecodeError) as error:
32 logging.error("Fehler beim Laden der Konfigurationsdatei: %s", error)
33 return {}
35def get_enviroment_filename():
36 try:
37 filename = os.environ.get("WORKING_ENV")
38 except:
39 filename = None
41 if filename is None :
42 filename = 'config.json'
43 return filename
45def put_led_on_off(turn_on: bool, led_id: int, session, apr_url_led :str) -> bool:
46 #Schaltet die LED ein oder aus
47 cmd_led : str = 'on'
48 if turn_on :
49 cmd_led = 'on'
50 else :
51 cmd_led = 'off'
53 data = {'id': led_id, 'status': cmd_led}
54 try:
55 response = session.post(apr_url_led, json=data, timeout=5)
56 response.raise_for_status()
57 return response.status_code == 200
58 except requests.RequestException as error:
59 logging.error("Fehler beim Senden des LED-Status: %s", error)
60 return False
62def get_led_state(led_id: int, session: any, apr_url_led : str) -> None:
63 #Fragt den Status der LED ab.
64 try:
65 response = session.get(apr_url_led, params={'id': led_id}, timeout=5)
66 response.raise_for_status()
67 logging.info("LED Status: %s", response.json())
68 except requests.RequestException as error:
69 logging.error("Fehler beim Abrufen des LED-Status: %s", error)
71def get_solar_power(power_queue: "queue.Queue[int]",requestinterval:float,session, api_url:str) -> None:
72 """Abrufen und Anzeigen der Solardaten in einer Endlossrequestintervalchleife."""
73 while True:
74 try:
75 logging.info("Sende API-Anfrage...")
76 response = session.get(api_url, timeout=5)
77 response.raise_for_status()
78 data = response.json()
79 timestamp = data.get('timestamp', 0) / 1000 # In Sekunden umwandeln
80 datum = datetime.datetime.fromtimestamp(timestamp) if timestamp else "Ungültiges Datum"
81 current_power = data.get('currentPower', 0)
82 power_queue.put(current_power)
83 logging.info("ID: %s, Zeit: %s, Leistung: %d W", data.get('id', 'N/A'), datum, current_power)
84 except (requests.RequestException, ValueError, KeyError) as error:
85 logging.error("Fehler bei API-Anfrage oder Datenverarbeitung: %s", error)
86 logging.debug("url", api_url)
87 time.sleep(requestinterval)
89def main() -> None:
91 #Konfiguration laden
92 config_var = load_config('config.json')
93 mock_consumer: bool = config_var.get('mock_consumer', True)
94 mock_powerplant_sim = config_var.get('mock_powerplant_sim', True)
96 power_threshold_on: int = config_var.get('powerTresholdOn', 500)
97 power_threshold_off: int = config_var.get('powerTresholdOff', 300)
99 powerplant_id : int = config_var.get('powerplant_id',17)
100 api_url: str = config_var.get('powerplant_url', 'http://localhost:8080/power')
101 api_url += str(powerplant_id)
102 consumer_url: str = config_var.get('consumer_url', 'http://127.0.0.1:5000/led')
104 logging_interval: int = config_var.get('loggingInterval', 10)
105 led_id_rpi: int = config_var.get('consumer_led_id', 17) # GPIO on RPI
106 led_update_interval: int = config_var.get('LedUpdateInterval', 5)
108 # Logger konfigurieren
109 log_level_str = config_var.get("logLevel", "INFO").upper()
110 log_level = getattr(logging, log_level_str, logging.INFO)
111 print(f"Log-Level: {log_level_str}")
113 # Logger-Instanz holen
114 logger = logging.getLogger()
115 logger.setLevel(log_level) # Setze das Log-Level
117 # Formatter definieren
118 log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
120 # File-Handler für die Log-Datei
121 file_handler = logging.FileHandler("logs.log", encoding="utf-8")
122 file_handler.setFormatter(log_formatter)
123 file_handler.setLevel(log_level)
125 # Stream-Handler für die Konsolenausgabe
126 console_handler = logging.StreamHandler()
127 console_handler.setFormatter(log_formatter)
128 console_handler.setLevel(log_level)
130 # Falls bereits Handler existieren, erst entfernen
131 if logger.hasHandlers():
132 logger.handlers.clear()
134 # Neue Handler hinzufügen
135 logger.addHandler(file_handler)
136 logger.addHandler(console_handler)
138 # Session für effizientere Requests
139 session = requests.Session()
142 #Hauptfunktion des Programms.
143 power_queue: "queue.Queue[int]" = queue.Queue()
144 led_cmd: bool = False
146 #Power auslesen von Powerplant
147 if mock_powerplant_sim:
148 logging.info("Starte Datenlogger POWERPLANT SIM...")
149 else :
150 logging.info("Starte den Datenlogger-Thread...")
151 datalogger_thread = threading.Thread(target=get_solar_power, args=(power_queue, logging_interval, session, api_url), daemon=True)
152 datalogger_thread.start()
154 logging.info("Datalogger gestartet")
155 try:
156 while True:
157 #current Power source SIM oder Powerplant
158 if mock_powerplant_sim :
159 current_power =random.randint(1, 800)
160 else:
161 current_power = power_queue.get()
162 logging.info("Aktuelle Leistung: %d W", current_power)
164 #Controller for on/off cunsumer
165 if current_power > power_threshold_on :
166 led_cmd = True
167 elif current_power < power_threshold_off :
168 led_cmd = False
170 #consumer response source SIM oder API Response
171 if mock_consumer :
172 logging.info("LED Zustand : %d", led_cmd)
173 else :
174 put_led_on_off(led_cmd, led_id_rpi,session,consumer_url)
175 get_led_state(led_id_rpi,session,consumer_url)
177 time.sleep(led_update_interval)
179 except KeyboardInterrupt:
180 logging.info("Programm wird beendet...")
182if __name__ == '__main__':
183 main()