from datetime import datetime, timedelta, timezone import json from urllib import parse import logging import re import typing as t import requests import traceback import phonenumbers from phonenumbers import timezone as pntz import pytz from models import db, Stats, Weather, Zipcode from config import env_OWM_KEY, env_OWM_UNITS, env_CACHE_TIME logger = logging.getLogger("gunicorn.error") weather_template = "The current temperature is: {0}. The real feel temperature is: {1}. The high is: {2}. The low is: {3}. The current humidity is: {4} percent. The summary for today is: {5}." def str_none(x): if x is None: return "" else: return str(x) def string_validator(input_str: str): # Decode the input string decoded_str = parse.unquote(input_str) # Sanitize the string sanitized = re.sub(r"[\s]", "", decoded_str) sanitized = re.sub(r'[<>"\'%;]', "", sanitized) # Check length of the string if len(sanitized) < 1: return None return sanitized def validate_data_presence(data: t.Dict[str, t.Any], keys: list[str]) -> bool: """ Validate that all given keys are present in the data. Args: data (Dict[str, Any]): The JSON data to be validated. keys (list[str]): A list of keys to look for in the data. Returns: bool: If any key is missing, returns False. Otherwise, returns True. """ for key in keys: if key not in data: return False return True def strq(str: str) -> str: return '"' + str + '"' def _get_time(phone_number): try: # Get and print the current time in that timezone time_object = _get_phone_time(phone_number) time = f"{time_object.strftime('The time is %I:%M %p.')}" return time except Exception as e: logger.error("Error in _get_time: " + str(e)) logger.error(traceback.format_exc()) return "An error has occured and the time could not be retrieved." def _get_date(phone_number): try: # Get and print the current date in that timezone time_object = _get_phone_time(phone_number) date = f"{time_object.strftime('Today is %A, %B %d.')}" return date except Exception as e: logger.error("Error in _get_date: " + str(e)) logger.error(traceback.format_exc()) return "An error has occured and the date could not be retrieved." def _get_phone_time(phone_number): try: # Parse the cleaned phone number into a PhoneNumber object number = phonenumbers.parse(phone_number, "US") valid = phonenumbers.is_valid_number(number) if not valid: raise TypeError("Number not valid") # Determine the timezone based on the area code or country code tz_str = pntz.time_zones_for_number(number) tz_str = tz_str[0] except Exception as e: logger.error("Error parsing phone number " + str(phone_number) + " : " + str(e)) # Default to America/New_York if there's an error during conversion tz_str = "America/New_York" tz = pytz.timezone(tz_str) # Get current datetime in the target timezone now_utc = datetime.now(timezone.utc) local_time = now_utc.astimezone(tz) return local_time def _get_weather(day, lat, long): try: if day is None or lat is None or long is None: return ( "An error has occured and the request could not be fulfilled." ) weather_json = _get_weather_json(lat, long) if weather_json is None: return "An error has occured and the weather could not be retrieved." if day == 0: weather = weather_template.format( round(weather_json["current"]["temp"]), round(weather_json["current"]["feels_like"]), round(weather_json["daily"][0]["temp"]["max"]), round(weather_json["daily"][0]["temp"]["min"]), round(weather_json["current"]["humidity"]), weather_json["daily"][0]["summary"], ) else: tz = pytz.timezone(weather_json["timezone"]) now_utc = datetime.now(timezone.utc) forcastdate = now_utc.astimezone(tz) + timedelta(days=day) weather = "The forcast on {0} is a high of: {1} and low of: {2}. The summary is: {3}.".format( forcastdate.strftime("%A"), round(weather_json["daily"][day]["temp"]["max"]), round(weather_json["daily"][day]["temp"]["min"]), weather_json["daily"][day]["summary"], ) lat_long = str(lat) + "," + str(long) s = Stats(lat_long=lat_long) db.session.add(s) db.session.commit() return weather except Exception as e: logger.error("Error in _get_weather: " + str(e)) logger.error(traceback.format_exc()) return "An error has occured and the weather could not be retrieved." def _get_weather_json(lat, long): url = "https://api.openweathermap.org/data/3.0/onecall?lat={0}&lon={1}&exclude=alerts,minutely,hourly&units={2}&appid={3}".format( lat, long, env_OWM_UNITS, env_OWM_KEY ) lat_long = str(lat) + "," + str(long) current_time = datetime.now(timezone.utc) try: w = Weather.query.filter(Weather.lat_long == lat_long).first() if w and w.last_timestamp.replace( tzinfo=timezone.utc ) >= current_time - timedelta(minutes=env_CACHE_TIME): logger.info("Weather cache hit!") return w.results logger.info("Weather cache miss!") response = requests.get(url) if response.status_code == 200: weather = response.json() if w: w.results = weather w.last_timestamp = current_time else: w = Weather(lat_long=lat_long, results=weather) db.session.add(w) db.session.commit() return weather else: logger.error("Error in _get_weather_json: API call returned status code " + str(response.status_code) + ". " + str(response.json()["message"])) return None except requests.exceptions.RequestException as e: logger.error("Error in _get_weather_json: " + str(e)) logger.error(traceback.format_exc()) return None def _get_cords(zipcode): url = "http://api.openweathermap.org/geo/1.0/zip?zip={0},US&appid={1}".format( zipcode, env_OWM_KEY ) try: z = Zipcode.query.filter_by(zip=zipcode).first() if z: logger.info("Zipcode cache hit!") return z.results["lat"], z.results["lon"] logger.info("Zipcode cache miss!") response = requests.get(url) if response.status_code == 200: locale = response.json() logger.info(locale) new_z = Zipcode(zip=zipcode, results=locale) db.session.add(new_z) db.session.commit() return locale["lat"], locale["lon"] else: logger.error("Error in _get_cords: " + str(response.status_code)) return None, None except requests.exceptions.RequestException as e: logger.error("Error in _get_cords: " + str(e)) logger.error(traceback.format_exc()) return None, None