208 lines
6.7 KiB
Python
208 lines
6.7 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
import json
|
|
from urllib import parse
|
|
import logging
|
|
import re
|
|
import typing as t
|
|
import requests
|
|
import traceback
|
|
import phonenumbers
|
|
from phonenumbers import timezone as pntz
|
|
import pytz
|
|
|
|
from models import db, Stats, Weather, Zipcode
|
|
from config import env_OWM_KEY, env_OWM_UNITS, env_CACHE_TIME
|
|
|
|
logger = logging.getLogger("gunicorn.error")
|
|
weather_template = "The current temperature is: {0}. The real feel temperature is: {1}. The high is: {2}. The low is: {3}. The current humidity is: {4} percent. The summary for today is: {5}."
|
|
|
|
|
|
def str_none(x):
|
|
if x is None:
|
|
return ""
|
|
else:
|
|
return str(x)
|
|
|
|
|
|
def string_validator(input_str: str):
|
|
# Decode the input string
|
|
decoded_str = parse.unquote(input_str)
|
|
|
|
# Sanitize the string
|
|
sanitized = re.sub(r"[\s]", "", decoded_str)
|
|
sanitized = re.sub(r'[<>"\'%;]', "", sanitized)
|
|
|
|
# Check length of the string
|
|
if len(sanitized) < 1:
|
|
return None
|
|
|
|
return sanitized
|
|
|
|
|
|
def validate_data_presence(data: t.Dict[str, t.Any], keys: list[str]) -> bool:
|
|
"""
|
|
Validate that all given keys are present in the data.
|
|
|
|
Args:
|
|
data (Dict[str, Any]): The JSON data to be validated.
|
|
keys (list[str]): A list of keys to look for in the data.
|
|
|
|
Returns:
|
|
bool: If any key is missing, returns False. Otherwise, returns True.
|
|
"""
|
|
for key in keys:
|
|
if key not in data:
|
|
return False
|
|
return True
|
|
|
|
|
|
def strq(str: str) -> str:
|
|
return '"' + str + '"'
|
|
|
|
|
|
def _get_time(phone_number):
|
|
try:
|
|
# Get and print the current time in that timezone
|
|
time_object = _get_phone_time(phone_number)
|
|
time = f"{time_object.strftime('The time is %I:%M %p.')}"
|
|
return time
|
|
except Exception as e:
|
|
logger.error("Error in _get_time: " + str(e))
|
|
logger.error(traceback.format_exc())
|
|
return "An error has occured and the time could not be retrieved."
|
|
|
|
|
|
def _get_date(phone_number):
|
|
try:
|
|
# Get and print the current date in that timezone
|
|
time_object = _get_phone_time(phone_number)
|
|
date = f"{time_object.strftime('Today is %A, %B %d.')}"
|
|
return date
|
|
except Exception as e:
|
|
logger.error("Error in _get_date: " + str(e))
|
|
logger.error(traceback.format_exc())
|
|
return "An error has occured and the date could not be retrieved."
|
|
|
|
|
|
def _get_phone_time(phone_number):
|
|
|
|
try:
|
|
# Parse the cleaned phone number into a PhoneNumber object
|
|
number = phonenumbers.parse(phone_number, "US")
|
|
|
|
valid = phonenumbers.is_valid_number(number)
|
|
if not valid:
|
|
raise TypeError("Number not valid")
|
|
|
|
# Determine the timezone based on the area code or country code
|
|
tz_str = pntz.time_zones_for_number(number)
|
|
tz_str = tz_str[0]
|
|
except Exception as e:
|
|
logger.error("Error parsing phone number " + str(phone_number) + " : " + str(e))
|
|
# Default to America/New_York if there's an error during conversion
|
|
tz_str = "America/New_York"
|
|
|
|
tz = pytz.timezone(tz_str)
|
|
|
|
# Get current datetime in the target timezone
|
|
now_utc = datetime.now(timezone.utc)
|
|
local_time = now_utc.astimezone(tz)
|
|
|
|
return local_time
|
|
|
|
|
|
def _get_weather(lat, long):
|
|
try:
|
|
if lat is None or long is None:
|
|
return (
|
|
"An error has occured and the provided zipcode could not be understood."
|
|
)
|
|
|
|
weather_json = _get_weather_json(lat, long)
|
|
if weather_json is None:
|
|
return "An error has occured and the weather could not be retrieved."
|
|
|
|
weather = weather_template.format(
|
|
round(weather_json["current"]["temp"]),
|
|
round(weather_json["current"]["feels_like"]),
|
|
round(weather_json["daily"][0]["temp"]["max"]),
|
|
round(weather_json["daily"][0]["temp"]["min"]),
|
|
round(weather_json["current"]["humidity"]),
|
|
weather_json["daily"][0]["summary"],
|
|
)
|
|
lat_long = str(lat) + "," + str(long)
|
|
s = Stats(lat_long=lat_long)
|
|
db.session.add(s)
|
|
db.session.commit()
|
|
return weather
|
|
except Exception as e:
|
|
logger.error("Error in _get_weather: " + str(e))
|
|
logger.error(traceback.format_exc())
|
|
return "An error has occured and the weather could not be retrieved."
|
|
|
|
|
|
def _get_weather_json(lat, long):
|
|
url = "https://api.openweathermap.org/data/3.0/onecall?lat={0}&lon={1}&exclude=alerts,minutely,hourly&units={2}&appid={3}".format(
|
|
lat, long, env_OWM_UNITS, env_OWM_KEY
|
|
)
|
|
lat_long = str(lat) + "," + str(long)
|
|
current_time = datetime.now(timezone.utc)
|
|
|
|
try:
|
|
w = Weather.query.filter(Weather.lat_long == lat_long).first()
|
|
if w and w.last_timestamp.replace(
|
|
tzinfo=timezone.utc
|
|
) >= current_time - timedelta(minutes=env_CACHE_TIME):
|
|
logger.info("Weather cache hit!")
|
|
return w.results
|
|
logger.info("Weather cache miss!")
|
|
|
|
response = requests.get(url)
|
|
|
|
if response.status_code == 200:
|
|
weather = response.json()
|
|
if w:
|
|
w.results = weather
|
|
w.last_timestamp = current_time
|
|
else:
|
|
w = Weather(lat_long=lat_long, results=weather)
|
|
db.session.add(w)
|
|
db.session.commit()
|
|
return weather
|
|
else:
|
|
logger.error("Error in _get_weather_json: API call returned status code " + str(response.status_code) + ". " + str(response.json()["message"]))
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error("Error in _get_weather_json: " + str(e))
|
|
logger.error(traceback.format_exc())
|
|
return None
|
|
|
|
|
|
def _get_cords(zipcode):
|
|
url = "http://api.openweathermap.org/geo/1.0/zip?zip={0},US&appid={1}".format(
|
|
zipcode, env_OWM_KEY
|
|
)
|
|
|
|
try:
|
|
z = Zipcode.query.filter_by(zip=zipcode).first()
|
|
if z:
|
|
logger.info("Zipcode cache hit!")
|
|
return z.results["lat"], z.results["lon"]
|
|
logger.info("Zipcode cache miss!")
|
|
|
|
response = requests.get(url)
|
|
|
|
if response.status_code == 200:
|
|
locale = response.json()
|
|
logger.info(locale)
|
|
new_z = Zipcode(zip=zipcode, results=locale)
|
|
db.session.add(new_z)
|
|
db.session.commit()
|
|
return locale["lat"], locale["lon"]
|
|
else:
|
|
logger.error("Error in _get_cords: " + str(response.status_code))
|
|
return None, None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error("Error in _get_cords: " + str(e))
|
|
logger.error(traceback.format_exc())
|
|
return None, None
|