Veriseti olarak MovieLens verisetini kullanacağız. Bu örnekte kullanmak için küçük olan verisetini indirdim.
Tavsiye sistemi için Alternating Least Squares (ALS) metodunu kullanacağız.
İlk olarak, herzamanki gibi, gerekli fonksiyon ve modülleri içeri aktardık:
from pyspark.sql import SparkSession from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.recommendation import ALS from pyspark.sql import Row from pyspark.sql.functions import lit
Yeni bir Spark oturumu oluşturuyoruz:
spark = SparkSession.builder.appName("MovieRecs").getOrCreate()
Kullandığımız işletim sistemi Türkçe ise, bir lokalizasyon problemi ortaya çıkmaktadır. Bu sorunu aşmak için aşağıdaki kodları programımıza ekliyoruz:
locale = spark.sparkContext._jvm.java.util.Locale locale.setDefault(locale.forLanguageTag("en-US"))
Verimizi içeri aktarıyor ve verimizi içeren bir DataFrame oluşturuyoruz:
lines = spark.read.text("ml-latest-small/ratings.csv").rdd parts = lines.map(lambda row: row.value.split(",")) ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]) ,movieId=int(p[1]), rating=float(p[2]), timestamp=int(p[3]))) ratings = spark.createDataFrame(ratingsRDD)
Verimizi %80 eğitim ve %20 test verisi olmak üzere ikiye ayırıyoruz:
(training, test) = ratings.randomSplit([0.8, 0.2])
Modelimizi oluşturuyoruz ve eğitim verimizle eğitiyoruz:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating") model = als.fit(training)
Test verisi üzerinde modelin tahminlerine bakıyoruz ve RMSE ile modeli değerlendiriyoruz:
predictions = model.transform(test) predictions = predictions.dropna() evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") rmse = evaluator.evaluate(predictions) print("Root-mean-square error: {}".format(rmse))
Şimdi herhangi bir kullanıcı için film tavsiyeleri oluşturalım. Daha iyi bir tavsiyede bulunabilmek için en az 100 oy almış filmleri seçiyoruz. Daha sonra kullanıcı id’si 66 olan kullanıcı için bir DataFrame oluşturup modelimizi bunun üzerinde çalıştırıyoruz ve tahminleri oluşturuyoruz:
ratingCounts = ratings.groupBy("movieId").count().filter("count > 100") popularMovies = ratingCounts.select("movieId").withColumn('userId', lit(66)) recommendations = model.transform(popularMovies) topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)
Programımızı
from pyspark.sql import SparkSession from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.recommendation import ALS from pyspark.sql import Row from pyspark.sql.functions import lit if __name__ == '__main__': spark = SparkSession.builder.appName("MovieRecs").getOrCreate() locale = spark.sparkContext._jvm.java.util.Locale locale.setDefault(locale.forLanguageTag("en-US")) lines = spark.read.text("ml-latest-small/ratings.csv").rdd parts = lines.map(lambda row: row.value.split(",")) ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]) ,movieId=int(p[1]), rating=float(p[2]), timestamp=int(p[3]))) ratings = spark.createDataFrame(ratingsRDD) (training, test) = ratings.randomSplit([0.8, 0.2]) als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating") model = als.fit(training) predictions = model.transform(test) predictions = predictions.dropna() evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") rmse = evaluator.evaluate(predictions) print("Root-mean-square error: {}".format(rmse)) print("\nTop 20 recommendations:") ratingCounts = ratings.groupBy("movieId").count().filter("count > 100") popularMovies = ratingCounts.select("movieId").withColumn('userId', lit(66)) recommendations = model.transform(popularMovies) topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20) for recommendation in topRecommendations: print(recommendation) spark.stop()
movieRecs.py adı ile kaydedip
spark-submit movieRecs.py
komutu ile çalıştırıyoruz.
Bir sonraki yazıda görüşmek üzere.
Kaynaklar