Spark ile Film Tavsiye Sistemi

Veriseti olarak MovieLens  verisetini kullanacağız. Bu örnekte kullanmak için küçük olan verisetini indirdim.

Tavsiye sistemi için Alternating Least Squares (ALS) metodunu kullanacağız.

İlk olarak, herzamanki gibi, gerekli fonksiyon ve modülleri içeri aktardık:

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit

Yeni bir Spark oturumu oluşturuyoruz:

spark = SparkSession.builder.appName("MovieRecs").getOrCreate()

Kullandığımız işletim sistemi Türkçe ise, bir lokalizasyon problemi ortaya çıkmaktadır. Bu sorunu aşmak için aşağıdaki kodları programımıza ekliyoruz:

locale = spark.sparkContext._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))

Verimizi içeri aktarıyor ve verimizi içeren bir DataFrame oluşturuyoruz:

lines = spark.read.text("ml-latest-small/ratings.csv").rdd
parts = lines.map(lambda row: row.value.split(","))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]) ,movieId=int(p[1]), rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)

Verimizi %80 eğitim ve %20 test verisi olmak üzere ikiye ayırıyoruz:

(training, test) = ratings.randomSplit([0.8, 0.2])

Modelimizi oluşturuyoruz ve eğitim verimizle eğitiyoruz:

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

Test verisi üzerinde modelin tahminlerine bakıyoruz ve RMSE ile modeli değerlendiriyoruz:

predictions = model.transform(test)
predictions = predictions.dropna()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error: {}".format(rmse))

Şimdi herhangi bir kullanıcı için film tavsiyeleri oluşturalım. Daha iyi bir tavsiyede bulunabilmek için en az 100 oy almış filmleri seçiyoruz. Daha sonra kullanıcı id’si 66 olan kullanıcı için bir DataFrame oluşturup modelimizi bunun üzerinde çalıştırıyoruz ve tahminleri oluşturuyoruz:

ratingCounts = ratings.groupBy("movieId").count().filter("count > 100")
popularMovies = ratingCounts.select("movieId").withColumn('userId', lit(66))
recommendations = model.transform(popularMovies)
topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)

Programımızı

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit

if __name__ == '__main__':
    spark = SparkSession.builder.appName("MovieRecs").getOrCreate()

    locale = spark.sparkContext._jvm.java.util.Locale
    locale.setDefault(locale.forLanguageTag("en-US"))

    lines = spark.read.text("ml-latest-small/ratings.csv").rdd
    parts = lines.map(lambda row: row.value.split(","))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]) ,movieId=int(p[1]), rating=float(p[2]), timestamp=int(p[3])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2])

    als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
    model = als.fit(training)

    predictions = model.transform(test)
    predictions = predictions.dropna()
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error: {}".format(rmse))

    print("\nTop 20 recommendations:")
    ratingCounts = ratings.groupBy("movieId").count().filter("count > 100")
    popularMovies = ratingCounts.select("movieId").withColumn('userId', lit(66))

    recommendations = model.transform(popularMovies)

    topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)

    for recommendation in topRecommendations:
        print(recommendation)

    spark.stop()

movieRecs.py adı ile kaydedip

spark-submit movieRecs.py

komutu ile çalıştırıyoruz.

Bir sonraki yazıda görüşmek üzere.

Kaynaklar

  1. Collaborative Filtering – RDD-based API.
  2. Java String toLowerCase “Turkish locale bug” causes Spark problems

Bir Cevap Yazın

Aşağıya bilgilerinizi girin veya oturum açmak için bir simgeye tıklayın:

WordPress.com Logosu

WordPress.com hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Google fotoğrafı

Google hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Twitter resmi

Twitter hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Facebook fotoğrafı

Facebook hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Connecting to %s