Spark ile Film Tavsiye Sistemi

Veriseti olarak MovieLens  verisetini kullanacağız. Bu örnekte kullanmak için küçük olan verisetini indirdim.

Tavsiye sistemi için Alternating Least Squares (ALS) metodunu kullanacağız.

İlk olarak, herzamanki gibi, gerekli fonksiyon ve modülleri içeri aktardık:

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit

Yeni bir Spark oturumu oluşturuyoruz:

spark = SparkSession.builder.appName("MovieRecs").getOrCreate()

Kullandığımız işletim sistemi Türkçe ise, bir lokalizasyon problemi ortaya çıkmaktadır. Bu sorunu aşmak için aşağıdaki kodları programımıza ekliyoruz:

locale = spark.sparkContext._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))

Verimizi içeri aktarıyor ve verimizi içeren bir DataFrame oluşturuyoruz:

lines = spark.read.text("ml-latest-small/ratings.csv").rdd
parts = lines.map(lambda row: row.value.split(","))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]) ,movieId=int(p[1]), rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)

Verimizi %80 eğitim ve %20 test verisi olmak üzere ikiye ayırıyoruz:

(training, test) = ratings.randomSplit([0.8, 0.2])

Modelimizi oluşturuyoruz ve eğitim verimizle eğitiyoruz:

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

Test verisi üzerinde modelin tahminlerine bakıyoruz ve RMSE ile modeli değerlendiriyoruz:

predictions = model.transform(test)
predictions = predictions.dropna()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error: {}".format(rmse))

Şimdi herhangi bir kullanıcı için film tavsiyeleri oluşturalım. Daha iyi bir tavsiyede bulunabilmek için en az 100 oy almış filmleri seçiyoruz. Daha sonra kullanıcı id’si 66 olan kullanıcı için bir DataFrame oluşturup modelimizi bunun üzerinde çalıştırıyoruz ve tahminleri oluşturuyoruz:

ratingCounts = ratings.groupBy("movieId").count().filter("count > 100")
popularMovies = ratingCounts.select("movieId").withColumn('userId', lit(66))
recommendations = model.transform(popularMovies)
topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)

Programımızı

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit

if __name__ == '__main__':
    spark = SparkSession.builder.appName("MovieRecs").getOrCreate()

    locale = spark.sparkContext._jvm.java.util.Locale
    locale.setDefault(locale.forLanguageTag("en-US"))

    lines = spark.read.text("ml-latest-small/ratings.csv").rdd
    parts = lines.map(lambda row: row.value.split(","))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]) ,movieId=int(p[1]), rating=float(p[2]), timestamp=int(p[3])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2])

    als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
    model = als.fit(training)

    predictions = model.transform(test)
    predictions = predictions.dropna()
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error: {}".format(rmse))

    print("\nTop 20 recommendations:")
    ratingCounts = ratings.groupBy("movieId").count().filter("count > 100")
    popularMovies = ratingCounts.select("movieId").withColumn('userId', lit(66))

    recommendations = model.transform(popularMovies)

    topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)

    for recommendation in topRecommendations:
        print(recommendation)

    spark.stop()

movieRecs.py adı ile kaydedip

spark-submit movieRecs.py

komutu ile çalıştırıyoruz.

Bir sonraki yazıda görüşmek üzere.

Kaynaklar

  1. Collaborative Filtering – RDD-based API.
  2. Java String toLowerCase “Turkish locale bug” causes Spark problems

Yorum bırakın

Bu site, istenmeyenleri azaltmak için Akismet kullanıyor. Yorum verilerinizin nasıl işlendiği hakkında daha fazla bilgi edinin.