AtmoAssistant/app/utils.py
Evan 1d5bc48e8e
Some checks failed
SonarQube Scan / SonarQube Trigger (push) Failing after 28s
Build image - Testing / build-api-testing (push) Successful in 45s
Add /time and /date endpoints
2025-01-26 20:55:14 -05:00

204 lines
6.6 KiB
Python

from datetime import datetime, timedelta, timezone
import json
from urllib import parse
import logging
import re
import typing as t
import requests
import traceback
import phonenumbers
from phonenumbers import timezone as pntz
import pytz
from models import db, Stats, Weather, Zipcode
from config import env_OWM_KEY, env_OWM_UNITS, env_CACHE_TIME
logger = logging.getLogger("gunicorn.error")
weather_template = "The current temperature is: {0}. The real feel temperature is: {1}. The high is: {2}. The low is: {3}. The current humidity is: {4} percent. The summary for today is: {5}."
def str_none(x):
if x is None:
return ""
else:
return str(x)
def string_validator(input_str: str):
# Decode the input string
decoded_str = parse.unquote(input_str)
# Sanitize the string
sanitized = re.sub(r"[\s]", "", decoded_str)
sanitized = re.sub(r'[<>"\'%;]', "", sanitized)
# Check length of the string
if len(sanitized) < 1:
return None
return sanitized
def validate_data_presence(data: t.Dict[str, t.Any], keys: list[str]) -> bool:
"""
Validate that all given keys are present in the data.
Args:
data (Dict[str, Any]): The JSON data to be validated.
keys (list[str]): A list of keys to look for in the data.
Returns:
bool: If any key is missing, returns False. Otherwise, returns True.
"""
for key in keys:
if key not in data:
return False
return True
def strq(str: str) -> str:
return "\"" + str + "\""
def _get_time(phone_number):
try:
# Get and print the current time in that timezone
time_object = _get_phone_time(phone_number)
time = f"{time_object.strftime('The time is %I:%M %p.')}"
return time
except Exception as e:
logger.error("Error in _get_time: " + str(e))
logger.error(traceback.format_exc())
return "An error has occured and the time could not be retrieved."
def _get_date(phone_number):
try:
# Get and print the current date in that timezone
time_object = _get_phone_time(phone_number)
date = f"{time_object.strftime('Today is %A, %B %d.')}"
return date
except Exception as e:
logger.error("Error in _get_date: " + str(e))
logger.error(traceback.format_exc())
return "An error has occured and the date could not be retrieved."
def _get_phone_time(phone_number):
try:
# Parse the cleaned phone number into a PhoneNumber object
number = phonenumbers.parse(phone_number, "US")
valid = phonenumbers.is_valid_number(number)
if not valid:
raise TypeError("Number not valid")
# Determine the timezone based on the area code or country code
tz_str = pntz.time_zones_for_number(number)
tz_str = tz_str[0]
except Exception as e:
logger.error("Error parsing phone number " + str(phone_number) + " : " + str(e))
# Default to America/New_York if there's an error during conversion
tz_str = "America/New_York"
tz = pytz.timezone(tz_str)
# Get current datetime in the target timezone
now_utc = datetime.now(timezone.utc)
local_time = now_utc.astimezone(tz)
return local_time
def _get_weather(lat, long):
try:
if lat is None or long is None:
return "An error has occured and the provided zipcode could not be understood."
weather_json = _get_weather_json(lat, long)
if weather_json is None:
return "An error has occured and the weather could not be retrieved."
weather = weather_template.format(
round(weather_json["current"]["temp"]),
round(weather_json["current"]["feels_like"]),
round(weather_json["daily"][0]["temp"]["max"]),
round(weather_json["daily"][0]["temp"]["min"]),
round(weather_json["current"]["humidity"]),
weather_json["daily"][0]["summary"],
)
lat_long = str(lat) + "," + str(long)
s = Stats(lat_long=lat_long)
db.session.add(s)
db.session.commit()
return weather
except Exception as e:
logger.error("Error in _get_weather: " + str(e))
logger.error(traceback.format_exc())
return "An error has occured and the weather could not be retrieved."
def _get_weather_json(lat, long):
url = "https://api.openweathermap.org/data/3.0/onecall?lat={0}&lon={1}&exclude=alerts,minutely,hourly&units={2}&appid={3}".format(
lat, long, env_OWM_UNITS, env_OWM_KEY
)
lat_long = str(lat) + "," + str(long)
current_time = datetime.now(timezone.utc)
try:
w = Weather.query.filter(Weather.lat_long == lat_long).first()
if w and w.last_timestamp.replace(tzinfo=timezone.utc) >= current_time - timedelta(minutes=env_CACHE_TIME):
logger.info("Weather cache hit!")
return w.results
logger.info("Weather cache miss!")
response = requests.get(url)
if response.status_code == 200:
weather = response.json()
if w:
w.results = weather
w.last_timestamp = current_time
else:
w = Weather(lat_long=lat_long, results=weather)
db.session.add(w)
db.session.commit()
return weather
else:
logger.error("Error in _get_weather_json: " + str(response.status_code))
return None
except requests.exceptions.RequestException as e:
logger.error("Error in _get_weather_json: " + str(e))
logger.error(traceback.format_exc())
return None
def _get_cords(zipcode):
url = "http://api.openweathermap.org/geo/1.0/zip?zip={0},US&appid={1}".format(
zipcode, env_OWM_KEY
)
try:
z = Zipcode.query.filter_by(zip=zipcode).first()
if z:
logger.info("Zipcode cache hit!")
return z.results["lat"], z.results["lon"]
logger.info("Zipcode cache miss!")
response = requests.get(url)
if response.status_code == 200:
locale = response.json()
logger.info(locale)
new_z = Zipcode(zip=zipcode, results=locale)
db.session.add(new_z)
db.session.commit()
return locale["lat"], locale["lon"]
else:
logger.error("Error in _get_cords: " + str(response.status_code))
return None, None
except requests.exceptions.RequestException as e:
logger.error("Error in _get_cords: " + str(e))
logger.error(traceback.format_exc())
return None, None