Answers to CS50x 2019 Problem Set 8: SQL.
CS50 Finance
application.py solution
import os
from cs50 import SQL
from flask import Flask, flash, jsonify, redirect, render_template, request, session
from flask_session import Session
from tempfile import mkdtemp
from werkzeug.exceptions import default_exceptions, HTTPException, InternalServerError
from werkzeug.security import check_password_hash, generate_password_hash
from helpers import apology, login_required, lookup, usd
# Configure application
app = Flask(__name__)
# Ensure templates are auto-reloaded
app.config["TEMPLATES_AUTO_RELOAD"] = True
# Ensure responses aren't cached
@app.after_request
def after_request(response):
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
return response
# Custom filter
app.jinja_env.filters["usd"] = usd
# Configure session to use filesystem (instead of signed cookies)
app.config["SESSION_FILE_DIR"] = mkdtemp()
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
Session(app)
# Configure CS50 Library to use SQLite database
db = SQL("sqlite:///finance.db")
# Make sure API key is set
if not os.environ.get("API_KEY"):
raise RuntimeError("API_KEY not set")
@app.route("/")
@login_required
def index():
"""Show portfolio of stocks"""
# access user's cash in portfolio
updated_cash = db.execute("SELECT cash FROM users \
WHERE id=:id", id=session["user_id"])
# save grouped shares into stocks
stocks = db.execute(
"SELECT symbol, SUM(shares) as total_shares FROM portfolio WHERE id = :user_id \
GROUP BY symbol HAVING total_shares > 0", user_id=session["user_id"])
quotes = {}
# create a temporary variable to store TOTAL worth ( cash + share)
total_cash = 0
for stock in stocks:
quotes[stock["symbol"]] = lookup(stock["symbol"])
portfolio_symbols = db.execute("SELECT shares, symbol \
FROM portfolio WHERE id = :id", \
id=session["user_id"])
# update each symbol prices and total
for portfolio_symbol in portfolio_symbols:
symbol = portfolio_symbol["symbol"]
shares = portfolio_symbol["shares"]
stock = lookup(symbol)
total = shares * stock["price"]
total_cash += total
# update total cash -> cash + shares worth
total_cash += updated_cash[0]["cash"]
return render_template("index.html", stocks=stocks, quotes=quotes, \
cash=usd(updated_cash[0]["cash"]), total= usd(total_cash) )
@app.route("/buy", methods=["GET", "POST"])
@login_required
def buy():
"""Buy shares of stock"""
# User reached route via POST (as by submitting a form via POST)
if request.method == "POST":
# Retrieve ticker
symbol = request.form.get("symbol")
# Retrieve shares
shares = int(request.form.get("shares"))
# Lookup quote
x = lookup(symbol)
# Apologize if symbol left blank
if not symbol:
return apology("quote cannot be blank", 400)
# Apologize if shares left blank
if not shares:
return apology("shares cannot be blank", 400)
# Apologize if symbol does not exit
if not x:
return apology("ticker is not valid", 400)
# Check if shares was a positive integer
try:
shares = int(request.form.get("shares"))
except:
return apology("shares must be a positive integer", 400)
# Check cash balance
capital = db.execute("SELECT cash FROM users WHERE id = :id", \
id = session["user_id"])
# How much $$$ the user still has in her account
cash_remaining = capital[0]["cash"]
price_per_share = x["price"]
# Calculate the price of requested shares
total_price = price_per_share * shares
if total_price > cash_remaining:
return apology("not enough funds")
# Book keeping (TODO: should be wrapped with a transaction)
db.execute("UPDATE users SET cash = cash - :total WHERE id = :user_id", total=total_price, user_id=session["user_id"])
db.execute("INSERT INTO portfolio (id, symbol, name, shares, price, total, date) VALUES(:id, :symbol, :name, :shares, :price, :total, datetime('now'))",\
id=session["user_id"],
symbol=x["symbol"],
name=x["name"],
shares=shares,
price=usd(x["price"]),
total=total_price)
flash("Bought!")
# Return to homepage
return redirect("/")
# User reached route via GET (as by clicking a link or via redirect)
else:
return render_template("buy.html")
@app.route("/check", methods=["GET"])
def check():
"""Return true if username available, else false, in JSON format"""
WORDS = db.execute("SELECT username FROM users WHERE id=:user_id", user_id=session["user_id"])
q = request.args.get("q")
words = [word for word in WORDS if q and word.startswith(q)]
return jsonify(words)
@app.route("/history")
@login_required
def history():
"""Show history of transactions"""
# Get all the transactions from the user
rows = db.execute("SELECT * FROM portfolio WHERE id = :user_id", user_id=session["user_id"])
return render_template("history.html", rows=rows)
@app.route("/login", methods=["GET", "POST"])
def login():
"""Log user in"""
# Forget any user_id
session.clear()
# User reached route via POST (as by submitting a form via POST)
if request.method == "POST":
# Ensure username was submitted
if not request.form.get("username"):
return apology("must provide username", 400)
# Ensure password was submitted
elif not request.form.get("password"):
return apology("must provide password", 400)
# Query database for username
rows = db.execute("SELECT * FROM users WHERE username = :username",
username=request.form.get("username"))
# Ensure username exists and password is correct
if len(rows) != 1 or not check_password_hash(rows[0]["hash"], request.form.get("password")):
return apology("invalid username and/or password", 400)
# Remember which user has logged in
session["user_id"] = rows[0]["id"]
# Redirect user to home page
return redirect("/")
# User reached route via GET (as by clicking a link or via redirect)
else:
return render_template("login.html")
@app.route("/logout")
def logout():
"""Log user out"""
# Forget any user_id
session.clear()
# Redirect user to login form
return redirect("/")
@app.route("/quote", methods=["GET", "POST"])
@login_required
def quote():
"""Get stock quote."""
# User reached route via POST (as by submitting a form via POST)
if request.method == "POST":
# Retrieve form submission data
quote = request.form.get("symbol")
# Apologize if username or password left blank
if not quote:
return apology("quote cannot be blank", 400)
x = lookup(quote)
# Apologize if ticker is not valid
if not x:
return apology("ticker is invalid", 400)
return render_template("quoted.html", x=x)
# User reached route via GET (as by clicking a link or via redirect)
else:
return render_template("quote.html")
@app.route("/register", methods=["GET", "POST"])
def register():
"""Register user"""
# User reached route via POST (as by submitting a form via POST)
if request.method == "POST":
# Retrieve form submission data
username = request.form.get("username")
password = request.form.get("password")
confirmation = request.form.get("confirmation")
# Ensure username was submitted
if not username:
return apology("must provide username", 400)
# Ensure password was submitted
elif not password:
return apology("must provide password", 400)
# Ensure password-confirmation was submitted
elif not confirmation:
return apology("must provide confirmation password", 400)
# Ensure passwords & password check are the same
elif not password == confirmation:
return apology("the passwords must be identical", 400)
# Ensure username is not already taken
namecheck = db.execute("SELECT * FROM users WHERE username = :username",
username=request.form.get("username"))
if len(namecheck) == 1:
return apology("username already taken. please choose another", 400)
# Hash the password
hash = generate_password_hash(password)
# Add new user to the database
rows = db.execute("INSERT INTO users (username, hash) VALUES (:username, :hash)", username=request.form.get("username"), hash=hash)
# Login automatically
session["user_id"] = rows
# Direct to index
return redirect("/")
# User reached route via GET (as by clicking a link or via redirect)
else:
return render_template("register.html")
@app.route("/sell", methods=["GET", "POST"])
@login_required
def sell():
"""Sell shares of stock"""
# User reached route via POST (as by submitting a form via POST)
if request.method == "POST":
quote = lookup(request.form.get("symbol"))
# Check if the symbol exists
if quote == None:
return apology("invalid symbol", 400)
# Check if shares was a positive integer
try:
shares = int(request.form.get("shares"))
except:
return apology("shares must be a positive integer", 400)
# Check if we have enough shares
stock = db.execute("SELECT SUM(shares) as total_shares FROM portfolio WHERE id = :user_id AND \
symbol = :symbol GROUP BY symbol", \
user_id=session["user_id"], \
symbol=request.form.get("symbol"))
if len(stock) != 1 or stock[0]["total_shares"] <= 0 or stock[0]["total_shares"] < shares:
return apology("you can't sell less than 0 or more than you own", 400)
# Calculate the value of the requested shares
price_per_share = quote["price"]
total_price = price_per_share * shares
# Book keeping
db.execute("UPDATE users SET cash = cash + :price WHERE id = :user_id", price=total_price, user_id=session["user_id"])
db.execute("INSERT INTO portfolio (id, symbol, name, shares, price, total, date) VALUES(:id, :symbol, :name, :shares, :price, :total, datetime('now'))",\
id=session["user_id"],
symbol=quote["symbol"],
name=quote["name"],
shares=-(shares),
price=usd(quote["price"]),
total=total_price)
flash("Sold!")
# Redirect user to home page
return redirect("/")
# User reached route via GET (as by clicking a link or via redirect)
else:
stocks = db.execute(
"SELECT symbol, SUM(shares) as total_shares FROM portfolio WHERE id = :user_id GROUP BY symbol HAVING total_shares > 0", user_id=session["user_id"])
return render_template("sell.html", stocks=stocks)
def errorhandler(e):
"""Handle error"""
if not isinstance(e, HTTPException):
e = InternalServerError()
return apology(e.name, e.code)
# Listen for errors
for code in default_exceptions:
app.errorhandler(code)(errorhandler)