""" database dependencies to support sqliteDB examples """
from random import randrange
from datetime import date
import os, base64
import json
from sqlalchemy.exc import IntegrityError
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask import Blueprint, request, jsonify
from flask_restful import Api, Resource # used for REST API building
from datetime import datetime
import sqlite3
# Setup of key Flask object (app)
app = Flask(__name__)
# Setup SQLAlchemy object and properties for the database (db)
database = 'sqlite:///QA.db' # path and filename of database
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = database
app.config['SECRET_KEY'] = 'SECRET_KEY'
db = SQLAlchemy()
# This belongs in place where it runs once per project
db.init_app(app)
''' Tutorial: https://www.sqlalchemy.org/library.html#tutorials, try to get into Python shell and follow along '''
# Define the Post class to manage actions in 'posts' table, with a relationship to 'users' table
class Post(db.Model):
__tablename__ = 'posts'
__table_args__ = {'extend_existing': True}
# Define the Notes schema
id = db.Column(db.Integer, primary_key=True)
note = db.Column(db.Text, unique=False, nullable=False)
image = db.Column(db.String, unique=False)
# Define a relationship in Notes Schema to userID who originates the note, many-to-one (many notes to one user)
userID = db.Column(db.Integer, db.ForeignKey('qAs.id'))
# Constructor of a Notes object, initializes of instance variables within object
def __init__(self, id, note, image):
self.userID = id
self.note = note
self.image = image
# Returns a string representation of the Notes object, similar to java toString()
# returns string
def __repr__(self):
return "Notes(" + str(self.id) + "," + self.note + "," + str(self.userID) + ")"
# CRUD create, adds a new record to the Notes table
# returns the object added or None in case of an error
def create(self):
try:
# creates a Notes object from Notes(db.Model) class, passes initializers
db.session.add(self) # add prepares to persist person object to Notes table
db.session.commit() # SqlAlchemy "unit of work pattern" requires a manual commit
return self
except IntegrityError:
db.session.remove()
return None
# CRUD read, returns dictionary representation of Notes object
# returns dictionary
def read(self):
# encode image
path = app.config['UPLOAD_FOLDER']
file = os.path.join(path, self.image)
file_text = open(file, 'rb')
file_read = file_text.read()
file_encode = base64.encodebytes(file_read)
return {
"id": self.id,
"userID": self.userID,
"note": self.note,
"image": self.image,
}
# Define the User class to manage actions in the 'users' table
# -- Object Relational Mapping (ORM) is the key concept of SQLAlchemy
# -- a.) db.Model is like an inner layer of the onion in ORM
# -- b.) User represents data we want to store, something that is built on db.Model
# -- c.) SQLAlchemy ORM is layer on top of SQLAlchemy Core, then SQLAlchemy engine, SQL
class QA(db.Model):
__tablename__ = 'qAs' # table name is plural, class name is singular
__table_args__ = {'extend_existing': True}
# Define the User schema with "vars" from object
id = db.Column(db.Integer, primary_key=True)
_question = db.Column(db.String(255), unique=False, nullable=False)
_correctAnswer = db.Column(db.String(255), unique=False, nullable=False)
_incorrectAnswer1 = db.Column(db.String(255), unique=False, nullable = False)
_incorrectAnswer2 = db.Column(db.String(255), unique=False, nullable = False)
_incorrectAnswer3 = db.Column(db.String(255), unique=False, nullable = False)
# Defines a relationship between User record and Notes table, one-to-many (one user to many notes)
posts = db.relationship("Post", cascade='all, delete', backref='users', lazy=True)
# constructor of a User object, initializes the instance variables within object (self)
def __init__(self, question, correctAnswer, incorrectAnswer1, incorrectAnswer2, incorrectAnswer3):
self._question = question # variables with self prefix become part of the object,
self._correctAnswer = correctAnswer
self._incorrectAnswer1 = incorrectAnswer1
self._incorrectAnswer2 = incorrectAnswer2
self._incorrectAnswer3 = incorrectAnswer3
# a name getter method, extracts name from object
@property
def question(self):
return self._question
# a setter function, allows name to be updated after initial object creation
@question.setter
def question(self, name):
self._question = name
# a getter method, extracts email from object
@property
def correctAnswer(self):
return self._correctAnswer
# a setter function, allows name to be updated after initial object creation
@correctAnswer.setter
def correctAnswer(self, correctAnswer):
self._correctAnswer = correctAnswer
# check if uid parameter matches user id in object, return boolean
def is_uid(self, correctAnswer):
return self._correctAnswer == correctAnswer
# a getter method, extracts email from object
@property
def incorrectAnswer1(self):
return self._incorrectAnswer1
# a setter function, allows name to be updated after initial object creation
@incorrectAnswer1.setter
def incorrectAnswer1(self, incorrectAnswer1):
self._incorrectAnswer1 = incorrectAnswer1
# check if uid parameter matches user id in object, return boolean
def is_uid(self, incorrectAnswer1):
return self._incorrectAnswer1 == incorrectAnswer1
@property
def incorrectAnswer2(self):
return self._incorrectAnswer2
# a setter function, allows name to be updated after initial object creation
@incorrectAnswer2.setter
def incorrectAnswer2(self, incorrectAnswer2):
self._incorrectAnswer2 = incorrectAnswer2
# check if uid parameter matches user id in object, return boolean
def is_uid(self, incorrectAnswer2):
return self._incorrectAnswer2 == incorrectAnswer2
@property
def incorrectAnswer3(self):
return self._incorrectAnswer3
# a setter function, allows name to be updated after initial object creation
@incorrectAnswer3.setter
def incorrectAnswer3(self, incorrectAnswer3):
self._incorrectAnswer3 = incorrectAnswer3
# check if uid parameter matches user id in object, return boolean
def is_uid(self, incorrectAnswer3):
return self._incorrectAnswer3 == incorrectAnswer3
# output content using str(object) in human readable form, uses getter
# output content using json dumps, this is ready for API response
def __str__(self):
return json.dumps(self.read())
# CRUD create/add a new record to the table
# returns self or None on error
def create(self):
try:
# creates a person object from User(db.Model) class, passes initializers
db.session.add(self) # add prepares to persist person object to Users table
db.session.commit() # SqlAlchemy "unit of work pattern" requires a manual commit
return self
except IntegrityError:
db.session.remove()
return None
# CRUD read converts self to dictionary
# returns dictionary
def read(self):
return {
#"id": self.id,
"question": self._question,
"correctAnswer": self._correctAnswer,
"incorrectAnswer1": self._incorrectAnswer1,
"incorrectAnswer2": self._incorrectAnswer2,
"incorrectAnswer3": self._incorrectAnswer3
}
# CRUD update: updates user name, password, phone
# returns self
def update(self, question="", correctAnswer="", incorrectAnswer1 = "", incorrectAnswer2 = "", incorrectAnswer3 = ""):
"""only updates values with length"""
if len(question) > 0:
self.question = question
if len(correctAnswer) > 0:
self.correctAnswer = correctAnswer
if len(incorrectAnswer1) > 0:
self.incorrectAnswer1 = incorrectAnswer1
if len(incorrectAnswer2) > 0:
self.incorrectAnswer2 = incorrectAnswer2
if len(incorrectAnswer3) > 0:
self.incorrectAnswer3 = incorrectAnswer3
db.session.commit()
return self
# CRUD delete: remove self
# None
def delete(self):
db.session.delete(self)
db.session.commit()
return None
"""Database Creation and Testing """
# Builds working data for testing
def initQAs():
with app.app_context():
"""Create database and tables"""
db.create_all()
"""Tester data for table"""
q1 = QA(question='What is a CPU?', correctAnswer='central processing unit', incorrectAnswer1 = 'coolant protection unit', incorrectAnswer2 = 'unit processor', incorrectAnswer3 = 'man idk')
q2 = QA(question='What does HTML stand for?', correctAnswer='Hypertext Markup Language', incorrectAnswer1 = 'Hyperlink Maker Language', incorrectAnswer2 = 'Hyper Mail Markup', incorrectAnswer3 = 'Humphrey Man Table Lawnmower')
q3 = QA(question='What do we use AWS for?', correctAnswer='Deployment', incorrectAnswer1 = 'Fortnite', incorrectAnswer2 = 'Minecraft', incorrectAnswer3 = 'Java')
q4 = QA(question='How do we access Linux on our machines?', correctAnswer='wsl', incorrectAnswer1 = 'pylance', incorrectAnswer2 = 'Mongodb', incorrectAnswer3 = 'ReactJS')
q5 = QA(question='Who is our teacher?', correctAnswer='mr. yeung', incorrectAnswer1 = 'jmort223', incorrectAnswer2 = 'nathanial kim', incorrectAnswer3 = 'sabine')
q6 = QA(question='What does FUBAR stand for?', correctAnswer='Funked up beyond repair', incorrectAnswer1 = 'Flipped u before reprimanding', incorrectAnswer2 = 'Family Under Bad Analogies Reviewed', incorrectAnswer3 = 'Fun Ultra Boat Also Rollerblades')
q7 = QA(question='How many sprints have we gone through this year?', correctAnswer='23', incorrectAnswer1 = '1', incorrectAnswer2 = '18', incorrectAnswer3 = '52')
q8 = QA(question='Which is NOT an example of proper PII?', correctAnswer='Your car', incorrectAnswer1 = 'House address', incorrectAnswer2 = 'Bank codes', incorrectAnswer3 = 'Passwords')
q9 = QA(question='What do we use for our databases?', correctAnswer='SQLAlchemy', incorrectAnswer1 = 'XML', incorrectAnswer2 = 'MongoDB', incorrectAnswer3 = 'NodeJS')
q10 = QA(question='What would constitute as backend development?', correctAnswer='working with python to create things like a database.', incorrectAnswer1 = 'using HTML to create a webpage', incorrectAnswer2 = 'using CSS to style', incorrectAnswer3 = 'Deployment using a platform like AWS')
q11 = QA(question='What language is typically used in frontend development?', correctAnswer='HTML', incorrectAnswer1 = 'Python', incorrectAnswer2 = 'Assembly', incorrectAnswer3 = 'C-')
q12 = QA(question='What does AWS stand for?', correctAnswer='Amazon Web Services', incorrectAnswer1 = 'AWOOOOOOOGA', incorrectAnswer2 = 'Awsome Wall Seperator', incorrectAnswer3 = 'Amazonian World Services')
q13 = QA(question='Which of these are NOT one of the 4 roles assigned to us in CSP?', correctAnswer='Deployment Manager', incorrectAnswer1 = 'Frontend Developer', incorrectAnswer2 = 'Backend developer', incorrectAnswer3 = 'Scrum Master')
qAs = [q1, q2, q3, q4, q5, q6, q7, q8, q9, q10, q11, q12, q13]
"""Builds sample user/note(s) data"""
for qA in qAs:
try:
'''add a few 1 to 4 notes per user'''
for num in range(randrange(1, 4)):
note = "#### " + qA.question + " note " + str(num) + ". \n Generated by test data."
qA.posts.append(Post(id=qA.id, note=note, image='ncs_logo.png'))
'''add user/post data to table'''
qA.create()
except IntegrityError:
'''fails with bad or duplicate data'''
db.session.remove()
print(f"Records exist, duplicate email, or error: {qA.correctAnswer}")
initQAs()
database = 'instance/QA.db'
def create():
question = input("please enter a question")
correctAnswer = input("please enter the correct answer")
incorrectAnswer1 = input("please enter an incorrect answer")
incorrectAnswer2 = input("please enter another incorrect answer")
incorrectAnswer3 = input("please enter the last incorrect answer")
conn = sqlite3.connect(database)
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO qAs (_question, _correctAnswer, _incorrectAnswer1, _incorrectAnswer2, _incorrectAnswer3) VALUES (?, ?, ?, ?, ?)", (question, correctAnswer, incorrectAnswer1, incorrectAnswer2, incorrectAnswer3))
# Commit the changes to the database
conn.commit()
print(f"A new question record {question} has been created")
except sqlite3.Error as error:
print("Error while executing the INSERT:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
def read():
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Execute a SELECT statement to retrieve data from a table
results = cursor.execute('SELECT * FROM qAs').fetchall()
# Print the results
if len(results) == 0:
print("Table is empty")
else:
for row in results:
print(row)
# Close the cursor and connection objects
cursor.close()
conn.close()
def update():
question = input("what question do you want to update?")
correctAnswer = input("what should the correct answer be?")
if len(correctAnswer) < 2:
message = "invalid answer"
correctAnswer = "NULL"
else:
message = "correct answer updated!"
conn = sqlite3.connect(database)
cursor = conn.cursor()
try:
# Execute an SQL command to update data in a table
cursor.execute("UPDATE qAs SET _correctAnswer = ? WHERE _question = ?", (correctAnswer, question))
if cursor.rowcount == 0:
# The uid was not found in the table
print(f"No question {question} was not found in the table")
else:
print(f"The row with question {question} the correct answer has been {message}")
conn.commit()
except sqlite3.Error as error:
print("Error while executing the UPDATE:", error)
cursor.close()
conn.close()
def delete():
question = input("enter a question to delete")
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
cursor.execute("DELETE FROM qAs WHERE _question = ?", (question,))
if cursor.rowcount == 0:
# The uid was not found in the table
print(f"No question {question} was not found in the table")
else:
# The uid was found in the table and the row was deleted
print(f"The row with {question} was successfully deleted")
conn.commit()
except sqlite3.Error as error:
print("Error while executing the DELETE:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
# Menu, to run other cells from one control point
def menu():
operation = input("Enter: (C)reate (R)ead (U)pdate or (D)elete")
if operation.lower() == 'c':
create()
elif operation.lower() == 'r':
read()
elif operation.lower() == 'u':
update()
elif operation.lower() == 'd':
delete()
elif len(operation)==0: # Escape Key
return
else:
print("Please enter c, r, u, or d")
menu() # recursion, repeat menu
try:
menu() # start menu
except:
print("Perform Jupyter 'Run All' prior to starting menu")
# def create():
# body = request.get_json()
# question = request.get("question")
# correctAnswer = body.get("correctAnswer")
# incorrectAnswer1 = body.get("incorrectAnswer1")
# incorrectAnswer2 = body.get("incorrectAnswer2")
# incorrectAnswer3 = body.get("incorrectAnswer3")
# uo = QA(question = question,
# incorrectAnswer1= incorrectAnswer1,
# incorrectAnswer2= incorrectAnswer2,
# incorrectAnswer3= incorrectAnswer3)
# qA = uo.create()
# if qA:
# return jsonify(qA.read())
# else:
# return {'message': f'Processed {question}, format error'}, 400
# read()
# with app.app_context:
# qAs = QA.query.all()
# random.shuffle(qAs)
# qA = qAs[0]
# return jsonify(qA.read())